hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [2/2] git commit: Implement Preemptive Fast Fail
Date Wed, 29 Oct 2014 05:48:09 GMT
Implement Preemptive Fast Fail

Summary: This diff ports the Preemptive Fast Fail feature to OSS. In multi threaded clients, we use a feature developed on 0.89-fb branch called Preemptive Fast Fail. This allows the client threads which would potentially fail, fail fast. The idea behind this feature is that we allow, among the hundreds of client threads, one thread to try and establish connection with the regionserver and if that succeeds, we mark it as a live node again. Meanwhile, other threads which are trying to establish connection to the same server would ideally go into the timeouts which is effectively unfruitful. We can in those cases return appropriate exceptions to those clients instead of letting them retry.

Test Plan: Unit tests

Differential Revision: https://reviews.facebook.net/D24177

Signed-off-by: stack <stack@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ece933fa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ece933fa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ece933fa

Branch: refs/heads/master
Commit: ece933fa3e272531ee443265c7aef7326e89e7cd
Parents: 95282f2
Author: manukranthk <manukranthk@fb.com>
Authored: Tue Sep 23 19:15:09 2014 -0700
Committer: stack <stack@apache.org>
Committed: Tue Oct 28 22:47:50 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClusterConnection.java  |  11 +
 .../hadoop/hbase/client/ConnectionAdapter.java  |   5 +
 .../hadoop/hbase/client/ConnectionManager.java  |  10 +-
 .../apache/hadoop/hbase/client/FailureInfo.java |  61 ++
 .../client/FastFailInterceptorContext.java      | 123 ++++
 .../org/apache/hadoop/hbase/client/HTable.java  |   2 +-
 .../client/NoOpRetryableCallerInterceptor.java  |  68 ++
 .../client/NoOpRetryingInterceptorContext.java  |  44 ++
 .../client/PreemptiveFastFailInterceptor.java   | 405 ++++++++++++
 .../hbase/client/RetryingCallerInterceptor.java |  98 +++
 .../RetryingCallerInterceptorContext.java       |  69 +++
 .../RetryingCallerInterceptorFactory.java       |  81 +++
 .../hadoop/hbase/client/RpcRetryingCaller.java  |  21 +-
 .../hbase/client/RpcRetryingCallerFactory.java  |  15 +-
 .../exceptions/ConnectionClosingException.java  |  59 ++
 .../exceptions/PreemptiveFastFailException.java |  70 +++
 .../org/apache/hadoop/hbase/ipc/RpcClient.java  |  17 +-
 .../client/TestFastFailWithoutTestUtil.java     | 613 +++++++++++++++++++
 .../org/apache/hadoop/hbase/HConstants.java     |  27 +
 .../hbase/client/HConnectionTestingUtility.java |   3 +
 .../hadoop/hbase/client/TestFastFail.java       | 313 ++++++++++
 21 files changed, 2102 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 528e7d0..398ecad 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.RegionLocations;
@@ -270,4 +271,14 @@ public interface ClusterConnection extends HConnection {
    * @return Default AsyncProcess associated with this connection.
    */
   AsyncProcess getAsyncProcess();
+
+  /**
+   * Returns a new RpcRetryingCallerFactory from the given {@link Configuration}.
+   * This RpcRetryingCallerFactory lets the users create {@link RpcRetryingCaller}s which can be
+   * intercepted with the configured {@link RetryingCallerInterceptor}
+   * @param conf
+   * @return RpcRetryingCallerFactory
+   */
+  RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf);
 }
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index dc2dc29..927b56c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -440,4 +440,9 @@ class ConnectionAdapter implements ClusterConnection {
   public AsyncProcess getAsyncProcess() {
     return wrappedConnection.getAsyncProcess();
   }
+
+  @Override
+  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
+    return wrappedConnection.getNewRpcRetryingCallerFactory(conf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 45c32f0..b3a6295 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -583,6 +583,8 @@ class ConnectionManager {
 
     private RpcControllerFactory rpcControllerFactory;
 
+    private final RetryingCallerInterceptor interceptor;
+
     /**
      * Cluster registry of basic info such as clusterid and meta region location.
      */
@@ -613,7 +615,6 @@ class ConnectionManager {
       retrieveClusterId();
 
       this.rpcClient = new RpcClient(this.conf, this.clusterId);
-      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
 
       // Do we publish the status?
@@ -664,6 +665,8 @@ class ConnectionManager {
         this.nonceGenerator = new NoNonceGenerator();
       }
       this.asyncProcess = createAsyncProcess(this.conf);
+      this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
+      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor);
     }
 
     @Override
@@ -2509,6 +2512,11 @@ class ConnectionManager {
         master.close();
       }
     }
+
+    @Override
+    public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
+      return RpcRetryingCallerFactory.instantiate(conf, this.interceptor);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java
new file mode 100644
index 0000000..9d685b8
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Keeps track of repeated failures to any region server. Multiple threads manipulate the contents
+ * of this thread.
+ *
+ * Access to the members is guarded by the concurrent nature of the members inherently.
+ * 
+ */
+@InterfaceAudience.Private
+class FailureInfo {
+  // The number of consecutive failures.
+  public final AtomicLong numConsecutiveFailures = new AtomicLong();
+  // The time when the server started to become unresponsive
+  // Once set, this would never be updated.
+  public final long timeOfFirstFailureMilliSec;
+  // The time when the client last tried to contact the server.
+  // This is only updated by one client at a time
+  public volatile long timeOfLatestAttemptMilliSec;
+  // Used to keep track of concurrent attempts to contact the server.
+  // In Fast fail mode, we want just one client thread to try to connect
+  // the rest of the client threads will fail fast.
+  public final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(
+      false);
+
+  @Override
+  public String toString() {
+    return "FailureInfo: numConsecutiveFailures = "
+        + numConsecutiveFailures + " timeOfFirstFailureMilliSec = "
+        + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = "
+        + timeOfLatestAttemptMilliSec
+        + " exclusivelyRetringInspiteOfFastFail  = "
+        + exclusivelyRetringInspiteOfFastFail.get();
+  }
+
+  FailureInfo(long firstFailureTime) {
+    this.timeOfFirstFailureMilliSec = firstFailureTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
new file mode 100644
index 0000000..9eb56bc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+class FastFailInterceptorContext extends
+    RetryingCallerInterceptorContext {
+
+  // The variable that indicates whether we were able to connect with the server
+  // in the last run
+  private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(
+      false);
+
+  // The variable which indicates whether this was a retry or the first time
+  private boolean didTry = false;
+
+  // The failure info that is associated with the machine which we are trying to
+  // contact as part of this attempt.
+  private FailureInfo fInfo = null;
+
+  // Variable indicating that the thread that is currently executing the
+  // operation is in a mode where it would retry instead of failing fast, so
+  // that we can figure out whether making contact with the server is
+  // possible or not.
+  private boolean retryDespiteFastFailMode = false;
+
+  // The server that would be contacted to successfully complete this operation.
+  private ServerName server;
+
+  // The number of the retry we are currenty doing.
+  private int tries;
+
+  public MutableBoolean getCouldNotCommunicateWithServer() {
+    return couldNotCommunicateWithServer;
+  }
+
+  public FailureInfo getFailureInfo() {
+    return fInfo;
+  }
+
+  public ServerName getServer() {
+    return server;
+  }
+
+  public int getTries() {
+    return tries;
+  }
+
+  public boolean didTry() {
+    return didTry;
+  }
+
+  public boolean isRetryDespiteFastFailMode() {
+    return retryDespiteFastFailMode;
+  }
+
+  public void setCouldNotCommunicateWithServer(
+      MutableBoolean couldNotCommunicateWithServer) {
+    this.couldNotCommunicateWithServer = couldNotCommunicateWithServer;
+  }
+
+  public void setDidTry(boolean didTry) {
+    this.didTry = didTry;
+  }
+
+  public void setFailureInfo(FailureInfo fInfo) {
+    this.fInfo = fInfo;
+  }
+
+  public void setRetryDespiteFastFailMode(boolean retryDespiteFastFailMode) {
+    this.retryDespiteFastFailMode = retryDespiteFastFailMode;
+  }
+
+  public void setServer(ServerName server) {
+    this.server = server;
+  }
+
+  public void setTries(int tries) {
+    this.tries = tries;
+  }
+
+  public void clear() {
+    server = null;
+    fInfo = null;
+    didTry = false;
+    couldNotCommunicateWithServer.setValue(false);
+    retryDespiteFastFailMode = false;
+    tries = 0;
+  }
+
+  public FastFailInterceptorContext prepare(RetryingCallable<?> callable) {
+    return prepare(callable, 0);
+  }
+
+  public FastFailInterceptorContext prepare(RetryingCallable<?> callable,
+      int tries) {
+    if (callable instanceof RegionServerCallable) {
+      RegionServerCallable<?> retryingCallable = (RegionServerCallable<?>) callable;
+      server = retryingCallable.getLocation().getServerName();
+    }
+    this.tries = tries;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index cd4376d..c3a94e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -358,7 +358,7 @@ public class HTable implements HTableInterface, RegionLocator {
     this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
 
-    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
+    this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
     this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
     // puts need to track errors globally due to how the APIs currently work.
     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java
new file mode 100644
index 0000000..f3f9168
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+
+/**
+ * Class that acts as a NoOpInterceptor. This class is used in case the
+ * {@link RetryingCallerInterceptor} was not configured correctly or an
+ * {@link RetryingCallerInterceptor} was never configured in the first place.
+ * 
+ */
+@InterfaceAudience.Private
+class NoOpRetryableCallerInterceptor extends RetryingCallerInterceptor {
+  private static final RetryingCallerInterceptorContext NO_OP_CONTEXT =
+      new NoOpRetryingInterceptorContext();
+
+  public NoOpRetryableCallerInterceptor() {
+  }
+
+  public NoOpRetryableCallerInterceptor(Configuration conf) {
+    super();
+  }
+
+  @Override
+  public void intercept(
+      RetryingCallerInterceptorContext abstractRetryingCallerInterceptorContext)
+      throws PreemptiveFastFailException {
+  }
+
+  @Override
+  public void handleFailure(RetryingCallerInterceptorContext context,
+      Throwable t) throws IOException {
+  }
+
+  @Override
+  public void updateFailureInfo(RetryingCallerInterceptorContext context) {
+  }
+
+  @Override
+  public RetryingCallerInterceptorContext createEmptyContext() {
+    return NO_OP_CONTEXT;
+  }
+
+  @Override
+  public String toString() {
+    return "NoOpRetryableCallerInterceptor";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
new file mode 100644
index 0000000..1ccf43c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+class NoOpRetryingInterceptorContext extends RetryingCallerInterceptorContext {
+
+  @Override
+  public void clear() {
+    // Do Nothing
+  }
+
+  @Override
+  public RetryingCallerInterceptorContext prepare(
+      RetryingCallable<?> callable) {
+    // Do Nothing
+    return this;
+  }
+
+  @Override
+  public RetryingCallerInterceptorContext prepare(
+      RetryingCallable<?> callable, int tries) {
+    // Do Nothing
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
new file mode 100644
index 0000000..4256120
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.SyncFailedException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * 
+ * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
+ * feature.
+ * 
+ * The motivation is as follows : 
+ * In case where a large number of clients try and talk to a particular region server in hbase, if
+ * the region server goes down due to network problems, we might end up in a scenario where
+ * the clients would go into a state where they all start to retry.
+ * This behavior will set off many of the threads in pretty much the same path and they all would be
+ * sleeping giving rise to a state where the client either needs to create more threads to send new
+ * requests to other hbase machines or block because the client cannot create anymore threads.
+ * 
+ * In most cases the clients might prefer to have a bound on the number of threads that are created
+ * in order to send requests to hbase. This would mostly result in the client thread starvation.
+ * 
+ *  To circumvent this problem, the approach that is being taken here under is to let 1 of the many
+ *  threads who are trying to contact the regionserver with connection problems and let the other
+ *  threads get a {@link PreemptiveFastFailException} so that they can move on and take other
+ *  requests.
+ *  
+ *  This would give the client more flexibility on the kind of action he would want to take in cases
+ *  where the regionserver is down. He can either discard the requests and send a nack upstream
+ *  faster or have an application level retry or buffer the requests up so as to send them down to
+ *  hbase later.
+ *
+ */
+@InterfaceAudience.Private
+class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
+
+  public static final Log LOG = LogFactory
+      .getLog(PreemptiveFastFailInterceptor.class);
+
+  // amount of time to wait before we consider a server to be in fast fail
+  // mode
+  protected final long fastFailThresholdMilliSec;
+
+  // Keeps track of failures when we cannot talk to a server. Helps in
+  // fast failing clients if the server is down for a long time.
+  protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap =
+      new ConcurrentHashMap<ServerName, FailureInfo>();
+
+  // We populate repeatedFailuresMap every time there is a failure. So, to
+  // keep it from growing unbounded, we garbage collect the failure information
+  // every cleanupInterval.
+  protected final long failureMapCleanupIntervalMilliSec;
+
+  protected volatile long lastFailureMapCleanupTimeMilliSec;
+
+  // clear failure Info. Used to clean out all entries.
+  // A safety valve, in case the client does not exit the
+  // fast fail mode for any reason.
+  private long fastFailClearingTimeMilliSec;
+
+  private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
+      new ThreadLocal<MutableBoolean>();
+
+  public PreemptiveFastFailInterceptor(Configuration conf) {
+    this.fastFailThresholdMilliSec = conf.getLong(
+        HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
+        HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
+    this.failureMapCleanupIntervalMilliSec = conf.getLong(
+        HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
+        HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
+    lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
+  }
+
+  public void intercept(FastFailInterceptorContext context)
+      throws PreemptiveFastFailException {
+    context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
+    if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
+      // In Fast-fail mode, all but one thread will fast fail. Check
+      // if we are that one chosen thread.
+      context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
+          .getFailureInfo()));
+      if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry
+        LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
+            + context.getTries());
+        throw new PreemptiveFastFailException(
+            context.getFailureInfo().numConsecutiveFailures.get(),
+            context.getFailureInfo().timeOfFirstFailureMilliSec,
+            context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
+      }
+    }
+    context.setDidTry(true);
+  }
+
+  public void handleFailure(FastFailInterceptorContext context,
+      Throwable t) throws IOException {
+    handleThrowable(t, context.getServer(),
+        context.getCouldNotCommunicateWithServer());
+  }
+
+  public void updateFailureInfo(FastFailInterceptorContext context) {
+    updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
+        context.didTry(), context.getCouldNotCommunicateWithServer()
+            .booleanValue(), context.isRetryDespiteFastFailMode());
+  }
+
+  /**
+   * Handles failures encountered when communicating with a server.
+   *
+   * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
+   * Throws RepeatedConnectException if the client is in Fast fail mode.
+   *
+   * @param serverName
+   * @param t
+   *          - the throwable to be handled.
+   * @throws PreemptiveFastFailException
+   */
+  private void handleFailureToServer(ServerName serverName, Throwable t) {
+    if (serverName == null || t == null) {
+      return;
+    }
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    FailureInfo fInfo = repeatedFailuresMap.get(serverName);
+    if (fInfo == null) {
+      fInfo = new FailureInfo(currentTime);
+      FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo);
+
+      if (oldfInfo != null) {
+        fInfo = oldfInfo;
+      }
+    }
+    fInfo.timeOfLatestAttemptMilliSec = currentTime;
+    fInfo.numConsecutiveFailures.incrementAndGet();
+  }
+
+  public void handleThrowable(Throwable t1, ServerName serverName,
+      MutableBoolean couldNotCommunicateWithServer) throws IOException {
+    Throwable t2 = translateException(t1);
+    boolean isLocalException = !(t2 instanceof RemoteException);
+    if (isLocalException && isConnectionException(t2)) {
+      couldNotCommunicateWithServer.setValue(true);
+      handleFailureToServer(serverName, t2);
+    }
+  }
+
+  private Throwable translateException(Throwable t) throws IOException {
+    if (t instanceof NoSuchMethodError) {
+      // We probably can't recover from this exception by retrying.
+      LOG.error(t);
+      throw (NoSuchMethodError) t;
+    }
+
+    if (t instanceof NullPointerException) {
+      // The same here. This is probably a bug.
+      LOG.error(t.getMessage(), t);
+      throw (NullPointerException) t;
+    }
+
+    if (t instanceof UndeclaredThrowableException) {
+      t = t.getCause();
+    }
+    if (t instanceof RemoteException) {
+      t = ((RemoteException) t).unwrapRemoteException();
+    }
+    if (t instanceof DoNotRetryIOException) {
+      throw (DoNotRetryIOException) t;
+    }
+    if (t instanceof Error) {
+      throw (Error) t;
+    }
+    return t;
+  }
+
+  /**
+   * Check if the exception is something that indicates that we cannot
+   * contact/communicate with the server.
+   *
+   * @param e
+   * @return true when exception indicates that the client wasn't able to make contact with server
+   */
+  private boolean isConnectionException(Throwable e) {
+    if (e == null)
+      return false;
+    // This list covers most connectivity exceptions but not all.
+    // For example, in SocketOutputStream a plain IOException is thrown
+    // at times when the channel is closed.
+    return (e instanceof SocketTimeoutException
+        || e instanceof ConnectException || e instanceof ClosedChannelException
+        || e instanceof SyncFailedException || e instanceof EOFException
+        || e instanceof TimeoutException
+        || e instanceof ConnectionClosingException || e instanceof FailedServerException);
+  }
+
+  /**
+   * Occasionally cleans up unused information in repeatedFailuresMap.
+   *
+   * repeatedFailuresMap stores the failure information for all remote hosts
+   * that had failures. In order to avoid these from growing indefinitely,
+   * occassionallyCleanupFailureInformation() will clear these up once every
+   * cleanupInterval ms.
+   */
+  protected void occasionallyCleanupFailureInformation() {
+    long now = System.currentTimeMillis();
+    if (!(now > lastFailureMapCleanupTimeMilliSec
+        + failureMapCleanupIntervalMilliSec))
+      return;
+
+    // remove entries that haven't been attempted in a while
+    // No synchronization needed. It is okay if multiple threads try to
+    // remove the entry again and again from a concurrent hash map.
+    StringBuilder sb = new StringBuilder();
+    for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
+      if (now > entry.getValue().timeOfLatestAttemptMilliSec
+          + failureMapCleanupIntervalMilliSec) { // no recent failures
+        repeatedFailuresMap.remove(entry.getKey());
+      } else if (now > entry.getValue().timeOfFirstFailureMilliSec
+          + this.fastFailClearingTimeMilliSec) { // been failing for a long
+                                                 // time
+        LOG.error(entry.getKey()
+            + " been failing for a long time. clearing out."
+            + entry.getValue().toString());
+        repeatedFailuresMap.remove(entry.getKey());
+      } else {
+        sb.append(entry.getKey().toString()).append(" failing ")
+            .append(entry.getValue().toString()).append("\n");
+      }
+    }
+    if (sb.length() > 0) {
+      LOG.warn("Preemptive failure enabled for : " + sb.toString());
+    }
+    lastFailureMapCleanupTimeMilliSec = now;
+  }
+
+  /**
+   * Checks to see if we are in the Fast fail mode for requests to the server.
+   *
+   * If a client is unable to contact a server for more than
+   * fastFailThresholdMilliSec the client will get into fast fail mode.
+   *
+   * @param server
+   * @return true if the client is in fast fail mode for the server.
+   */
+  private boolean inFastFailMode(ServerName server) {
+    FailureInfo fInfo = repeatedFailuresMap.get(server);
+    // if fInfo is null --> The server is considered good.
+    // If the server is bad, wait long enough to believe that the server is
+    // down.
+    return (fInfo != null &&
+        EnvironmentEdgeManager.currentTime() >
+          (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
+  }
+
+  /**
+   * Checks to see if the current thread is already in FastFail mode for *some*
+   * server.
+   *
+   * @return true, if the thread is already in FF mode.
+   */
+  private boolean currentThreadInFastFailMode() {
+    return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
+        .get().booleanValue() == true));
+  }
+
+  /**
+   * Check to see if the client should try to connnect to the server, inspite of
+   * knowing that it is in the fast fail mode.
+   *
+   * The idea here is that we want just one client thread to be actively trying
+   * to reconnect, while all the other threads trying to reach the server will
+   * short circuit.
+   *
+   * @param fInfo
+   * @return true if the client should try to connect to the server.
+   */
+  protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
+    // We believe that the server is down, But, we want to have just one
+    // client
+    // actively trying to connect. If we are the chosen one, we will retry
+    // and not throw an exception.
+    if (fInfo != null
+        && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
+      MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
+          .get();
+      if (threadAlreadyInFF == null) {
+        threadAlreadyInFF = new MutableBoolean();
+        this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
+      }
+      threadAlreadyInFF.setValue(true);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   *
+   * This function updates the Failure info for a particular server after the
+   * attempt to 
+   *
+   * @param server
+   * @param fInfo
+   * @param couldNotCommunicate
+   * @param retryDespiteFastFailMode
+   */
+  private void updateFailureInfoForServer(ServerName server,
+      FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
+      boolean retryDespiteFastFailMode) {
+    if (server == null || fInfo == null || didTry == false)
+      return;
+
+    // If we were able to connect to the server, reset the failure
+    // information.
+    if (couldNotCommunicate == false) {
+      LOG.info("Clearing out PFFE for server " + server.getServerName());
+      repeatedFailuresMap.remove(server);
+    } else {
+      // update time of last attempt
+      long currentTime = System.currentTimeMillis();
+      fInfo.timeOfLatestAttemptMilliSec = currentTime;
+
+      // Release the lock if we were retrying inspite of FastFail
+      if (retryDespiteFastFailMode) {
+        fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
+        threadRetryingInFastFailMode.get().setValue(false);
+      }
+    }
+
+    occasionallyCleanupFailureInformation();
+  }
+
+  @Override
+  public void intercept(RetryingCallerInterceptorContext context)
+      throws PreemptiveFastFailException {
+    if (context instanceof FastFailInterceptorContext) {
+      intercept((FastFailInterceptorContext) context);
+    }
+  }
+
+  @Override
+  public void handleFailure(RetryingCallerInterceptorContext context,
+      Throwable t) throws IOException {
+    if (context instanceof FastFailInterceptorContext) {
+      handleFailure((FastFailInterceptorContext) context, t);
+    }
+  }
+
+  @Override
+  public void updateFailureInfo(RetryingCallerInterceptorContext context) {
+    if (context instanceof FastFailInterceptorContext) {
+      updateFailureInfo((FastFailInterceptorContext) context);
+    }
+  }
+
+  @Override
+  public RetryingCallerInterceptorContext createEmptyContext() {
+    return new FastFailInterceptorContext();
+  }
+
+  protected boolean isServerInFailureMap(ServerName serverName) {
+    return this.repeatedFailuresMap.containsKey(serverName);
+  }
+
+  @Override
+  public String toString() {
+    return "PreemptiveFastFailInterceptor";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java
new file mode 100644
index 0000000..f372e2d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This class is designed to fit into the RetryingCaller class which forms the
+ * central piece of intelligence for the client side retries for most calls.
+ * 
+ * One can extend this class and intercept the RetryingCaller and add additional
+ * logic into the execution of a simple HTable operations like get, delete etc.
+ * 
+ * Concrete implementations of this calls are supposed to the thread safe. The
+ * object is used across threads to identify the fast failing threads.
+ * 
+ * For a concrete use case see {@link PreemptiveFastFailInterceptor}
+ * 
+ * Example use case : 
+ * try {
+ *   interceptor.intercept
+ *   doAction()
+ * } catch (Exception e) {
+ *   interceptor.handleFailure
+ * } finally {
+ *   interceptor.updateFaulireInfo
+ * }
+ * 
+ * The {@link RetryingCallerInterceptor} also acts as a factory
+ * for getting a new {@link RetryingCallerInterceptorContext}.
+ * 
+ */
+
+@InterfaceAudience.Private
+abstract class RetryingCallerInterceptor {
+
+  protected RetryingCallerInterceptor() {
+    // Empty constructor protected for NoOpRetryableCallerInterceptor
+  }
+
+  /**
+   * This returns the context object for the current call.
+   * 
+   * @return context : the context that needs to be used during this call.
+   */
+  public abstract RetryingCallerInterceptorContext createEmptyContext();
+
+  /**
+   * Call this function in case we caught a failure during retries.
+   * 
+   * @param context
+   *          : The context object that we obtained previously.
+   * @param t
+   *          : The exception that we caught in this particular try
+   * @throws IOException
+   */
+  public abstract void handleFailure(RetryingCallerInterceptorContext context,
+      Throwable t) throws IOException;
+
+  /**
+   * Call this function alongside the actual call done on the callable.
+   * 
+   * @param abstractRetryingCallerInterceptorContext
+   * @throws PreemptiveFastFailException
+   */
+  public abstract void intercept(
+      RetryingCallerInterceptorContext abstractRetryingCallerInterceptorContext)
+      throws IOException;
+
+  /**
+   * Call this function to update at the end of the retry. This is not necessary
+   * to happen.
+   * 
+   * @param context
+   */
+  public abstract void updateFailureInfo(
+      RetryingCallerInterceptorContext context);
+
+  @Override
+  public abstract String toString();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
new file mode 100644
index 0000000..a9f414f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The context object used in the {@link RpcRetryingCaller} to enable
+ * {@link RetryingCallerInterceptor} to intercept calls.
+ * {@link RetryingCallerInterceptorContext} is the piece of information unique
+ * to a retrying call that transfers information from the call into the
+ * {@link RetryingCallerInterceptor} so that {@link RetryingCallerInterceptor}
+ * can take appropriate action according to the specific logic
+ *
+ */
+@InterfaceAudience.Private
+abstract class RetryingCallerInterceptorContext {
+  protected RetryingCallerInterceptorContext() {
+  }
+
+  /**
+   * This function clears the internal state of the context object.
+   */
+  public abstract void clear();
+
+  /**
+   * This prepares the context object by populating it with information specific
+   * to the implementation of the {@link RetryingCallerInterceptor} along with
+   * which this will be used.
+   * 
+   * @param callable
+   *          : The {@link RetryingCallable} that contains the information about
+   *          the call that is being made.
+   * @return A new {@link RetryingCallerInterceptorContext} object that can be
+   *         used for use in the current retrying call
+   */
+  public abstract RetryingCallerInterceptorContext prepare(
+      RetryingCallable<?> callable);
+
+  /**
+   * Telescopic extension that takes which of the many retries we are currently
+   * in.
+   * 
+   * @param callable
+   *          : The {@link RetryingCallable} that contains the information about
+   *          the call that is being made.
+   * @param tries
+   *          : The retry number that we are currently in.
+   * @return A new context object that can be used for use in the current
+   *         retrying call
+   */
+  public abstract RetryingCallerInterceptorContext prepare(
+      RetryingCallable<?> callable, int tries);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java
new file mode 100644
index 0000000..9799ec0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
+
+/**
+ * Factory implementation to provide the {@link HConnectionImplementation} with
+ * the implementation of the {@link RetryingCallerInterceptor} that we would use
+ * to intercept the {@link RpcRetryingCaller} during the course of their calls.
+ * 
+ */
+
+@InterfaceAudience.Private
+class RetryingCallerInterceptorFactory {
+  private static final Log LOG = LogFactory
+      .getLog(RetryingCallerInterceptorFactory.class);
+  private Configuration conf;
+  private final boolean failFast;
+  public static final RetryingCallerInterceptor NO_OP_INTERCEPTOR =
+      new NoOpRetryableCallerInterceptor(null);
+
+  public RetryingCallerInterceptorFactory(Configuration conf) {
+    this.conf = conf;
+    failFast = conf.getBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED,
+        HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE_DEFAULT);
+  }
+
+  /**
+   * This builds the implementation of {@link RetryingCallerInterceptor} that we
+   * specify in the conf and returns the same.
+   * 
+   * To use {@link PreemptiveFastFailInterceptor}, set HBASE_CLIENT_ENABLE_FAST_FAIL_MODE to true.
+   * HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL is defaulted to {@link PreemptiveFastFailInterceptor}
+   * 
+   * @return The factory build method which creates the
+   *         {@link RetryingCallerInterceptor} object according to the
+   *         configuration.
+   */
+  public RetryingCallerInterceptor build() {
+    RetryingCallerInterceptor ret = NO_OP_INTERCEPTOR;
+    if (failFast) {
+      try {
+        Class<?> c = conf.getClass(
+            HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
+            PreemptiveFastFailInterceptor.class);
+        Constructor<?> constructor = c
+            .getDeclaredConstructor(Configuration.class);
+        constructor.setAccessible(true);
+        ret = (RetryingCallerInterceptor) constructor.newInstance(conf);
+      } catch (Exception e) {
+        ret = new PreemptiveFastFailInterceptor(conf);
+      }
+    }
+    LOG.trace("Using " + ret.toString() + " for intercepting the RpcRetryingCaller");
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index bf4c40b..97e6381 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.ipc.RemoteException;
@@ -61,10 +62,19 @@ public class RpcRetryingCaller<T> {
   private final long pause;
   private final int retries;
   private final AtomicBoolean cancelled = new AtomicBoolean(false);
+  private final RetryingCallerInterceptor interceptor;
+  private final RetryingCallerInterceptorContext context;
 
   public RpcRetryingCaller(long pause, int retries) {
+    this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+  }
+  
+  public RpcRetryingCaller(long pause, int retries,
+      RetryingCallerInterceptor interceptor) {
     this.pause = pause;
     this.retries = retries;
+    this.interceptor = interceptor;
+    context = interceptor.createEmptyContext();
   }
 
   private int getRemainingTime(int callTimeout) {
@@ -83,7 +93,7 @@ public class RpcRetryingCaller<T> {
       return remainingTime;
     }
   }
-
+  
   public void cancel(){
     cancelled.set(true);
     synchronized (cancelled){
@@ -104,11 +114,15 @@ public class RpcRetryingCaller<T> {
     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
       new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
     this.globalStartTime = EnvironmentEdgeManager.currentTime();
+    context.clear();
     for (int tries = 0;; tries++) {
       long expectedSleep;
       try {
         callable.prepare(tries != 0); // if called with false, check table status on ZK
+        interceptor.intercept(context.prepare(callable, tries));
         return callable.call(getRemainingTime(callTimeout));
+      } catch (PreemptiveFastFailException e) {
+        throw e;
       } catch (Throwable t) {
         ExceptionUtil.rethrowIfInterrupt(t);
         if (LOG.isTraceEnabled()) {
@@ -118,6 +132,7 @@ public class RpcRetryingCaller<T> {
         }
 
         // translateException throws exception when should not retry: i.e. when request is bad.
+        interceptor.handleFailure(context, t);
         t = translateException(t);
         callable.throwable(t, retries != 1);
         RetriesExhaustedException.ThrowableWithExtraContext qt =
@@ -139,6 +154,8 @@ public class RpcRetryingCaller<T> {
               ": " + callable.getExceptionMessageAdditionalDetail();
           throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
         }
+      } finally {
+        interceptor.updateFailureInfo(context);
       }
       try {
         if (expectedSleep > 0) {
@@ -188,7 +205,7 @@ public class RpcRetryingCaller<T> {
       }
     }
   }
-
+  
   /**
    * Get the good or the remote exception if any, throws the DoNotRetryIOException.
    * @param t the throwable to analyze

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 9b070a5..f482262 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -31,27 +31,38 @@ public class RpcRetryingCallerFactory {
   protected final Configuration conf;
   private final long pause;
   private final int retries;
+  private final RetryingCallerInterceptor interceptor;
 
   public RpcRetryingCallerFactory(Configuration conf) {
+    this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+  }
+  
+  public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
     this.conf = conf;
     pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    this.interceptor = interceptor;
   }
 
   public <T> RpcRetryingCaller<T> newCaller() {
     // We store the values in the factory instance. This way, constructing new objects
     //  is cheap as it does not require parsing a complex structure.
-    return new RpcRetryingCaller<T>(pause, retries);
+    return new RpcRetryingCaller<T>(pause, retries, interceptor);
   }
 
   public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
+    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+  }
+  
+  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
+      RetryingCallerInterceptor interceptor) {
     String clazzName = RpcRetryingCallerFactory.class.getName();
     String rpcCallerFactoryClazz =
         configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
     if (rpcCallerFactoryClazz.equals(clazzName)) {
-      return new RpcRetryingCallerFactory(configuration);
+      return new RpcRetryingCallerFactory(configuration, interceptor);
     }
     return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
       new Class[] { Configuration.class }, new Object[] { configuration });

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java
new file mode 100644
index 0000000..cb8e5df
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.exceptions;
+
+/**
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+* Thrown when the client believes that we are trying to communicate to has
+* been repeatedly unresponsive for a while.
+*
+* On receiving such an exception. The HConnectionManager will skip all
+* retries and fast fail the operation.
+*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ConnectionClosingException extends IOException {
+  public ConnectionClosingException(String string) {
+    super(string);
+  }
+
+  private static final long serialVersionUID = -8980028569652624236L;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java
new file mode 100644
index 0000000..2d66d54
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java
@@ -0,0 +1,70 @@
+/**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+*/
+
+package org.apache.hadoop.hbase.exceptions;
+
+import java.net.ConnectException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Thrown when the client believes that we are trying to communicate to has
+ * been repeatedly unresponsive for a while.
+ *
+ * On receiving such an exception. The HConnectionManager will skip all
+ * retries and fast fail the operation.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public class PreemptiveFastFailException extends ConnectException {
+   private static final long serialVersionUID = 7129103682617007177L;
+   private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec;
+
+   /**
+    * @param count
+    * @param timeOfFirstFailureMilliSec
+    * @param timeOfLatestAttemptMilliSec
+    * @param serverName
+    */
+   public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
+       long timeOfLatestAttemptMilliSec, ServerName serverName) {
+     super("Exception happened " + count + " times. to" + serverName);
+     this.failureCount = count;
+     this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec;
+     this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
+   }
+
+   public long getFirstFailureAt() {
+     return timeOfFirstFailureMilliSec;
+   }
+
+   public long getLastAttemptAt() {
+     return timeOfLatestAttemptMilliSec;
+   }
+
+   public long getFailureCount() {
+     return failureCount;
+   }
+
+   public boolean wasOperationAttemptedByServer() {
+     return false;
+   }
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index c520bbf..225a2c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -107,7 +108,6 @@ import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
-
 /**
  * Does RPC against a cluster.  Manages connections per regionserver in the cluster.
  * <p>See HBaseServer
@@ -502,7 +502,7 @@ public class RpcClient {
       private void cleanup() {
         assert shouldCloseConnection.get();
 
-        IOException ie = new IOException("Connection to " + server + " is closing.");
+        IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
         while (true) {
           CallFuture cts = callsToWrite.poll();
           if (cts == null) {
@@ -717,7 +717,7 @@ public class RpcClient {
      */
     private void checkIsOpen() throws IOException {
       if (shouldCloseConnection.get()) {
-        throw new IOException(getName() + " is closing");
+        throw new ConnectionClosingException(getName() + " is closing");
       }
     }
 
@@ -907,7 +907,7 @@ public class RpcClient {
       }
 
       if (shouldCloseConnection.get()){
-        throw new IOException("This connection is closing");
+        throw new ConnectionClosingException("This connection is closing");
       }
 
       if (failedServers.isFailedServer(remoteId.getAddress())) {
@@ -1254,7 +1254,7 @@ public class RpcClient {
           itor.remove();
         } else if (allCalls) {
           long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
-          IOException ie = new IOException("Connection to " + getRemoteAddress()
+          IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
               + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
           c.setException(ie);
           itor.remove();
@@ -1510,8 +1510,8 @@ public class RpcClient {
         break;
       }
       if (connection.shouldCloseConnection.get()) {
-        throw new IOException("Call id=" + call.id + " on server "
-            + addr + " aborted: connection is closing");
+        throw new ConnectionClosingException("Call id=" + call.id +
+            " on server " + addr + " aborted: connection is closing");
       }
       try {
         synchronized (call) {
@@ -1559,6 +1559,9 @@ public class RpcClient {
     } else if (exception instanceof SocketTimeoutException) {
       return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
         " failed because " + exception).initCause(exception);
+    } else if (exception instanceof ConnectionClosingException){
+      return (ConnectionClosingException) new ConnectionClosingException(
+          "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
     } else {
       return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
         exception).initCause(exception);


Mime
View raw message