hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1464798 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-it/src/test/java/org/apache/hadoop/hbase/ hbase-server/src/test/java/org/apache/hadoop/hbase/client/
Date Fri, 05 Apr 2013 00:40:21 GMT
Author: sershe
Date: Fri Apr  5 00:40:21 2013
New Revision: 1464798

URL: http://svn.apache.org/r1464798
Log:
HBASE-7649 client retry timeout doesn't need to do x2 fallback when going to different server

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1464798&r1=1464797&r2=1464798&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Fri Apr  5 00:40:21 2013
@@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 
 /**
@@ -167,6 +168,8 @@ public class HConnectionManager {
   /** Default admin protocol class name. */
   public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
 
+  public static final String RETRIES_BY_SERVER = "hbase.client.retries.by.server";
+
   private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
 
   static {
@@ -513,10 +516,12 @@ public class HConnectionManager {
     private final Class<? extends AdminProtocol> adminClass;
     private final Class<? extends ClientProtocol> clientClass;
     private final long pause;
-    private final int numRetries;
+    private final int numTries;
     private final int maxRPCAttempts;
     private final int rpcTimeout;
     private final int prefetchRegionLimit;
+    private final boolean useServerTrackerForRetries;
+    private final long serverTrackerTimeout;
 
     private volatile boolean closed;
     private volatile boolean aborted;
@@ -602,7 +607,7 @@ public class HConnectionManager {
       }
       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
-      this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
           HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
       this.maxRPCAttempts = conf.getInt(
           HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
@@ -613,7 +618,21 @@ public class HConnectionManager {
       this.prefetchRegionLimit = conf.getInt(
           HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
           HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
-
+      this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER, true);
+      long serverTrackerTimeout = 0;
+      if (this.useServerTrackerForRetries) {
+        // Server tracker allows us to do faster, and yet useful (hopefully), retries.
+        // However, if we are too useful, we might fail very quickly due to retry count limit.
+        // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
+        // retry time if normal retries were used. Then we will retry until this time runs
out.
+        // If we keep hitting one server, the net effect will be the incremental backoff,
and
+        // essentially the same number of retries as planned. If we have to do faster retries,
+        // we will do more retries in aggregate, but the user will be none the wiser.
+        for (int i = 0; i < this.numTries; ++i) {
+          serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
+        }
+      }
+      this.serverTrackerTimeout = serverTrackerTimeout;
       retrieveClusterId();
 
       // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
@@ -772,10 +791,10 @@ public class HConnectionManager {
 
           if (exceptionCaught != null)
             // It failed. If it's not the last try, we're going to wait a little
-          if (tries < numRetries) {
+          if (tries < numTries) {
             // tries at this point is 1 or more; decrement to start from 0.
             long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1);
-            LOG.info("getMaster attempt " + tries + " of " + numRetries +
+            LOG.info("getMaster attempt " + tries + " of " + numTries +
               " failed; retrying after sleep of " +pauseTime + ", exception=" + exceptionCaught);
 
             try {
@@ -788,7 +807,7 @@ public class HConnectionManager {
 
           } else {
             // Enough tries, we stop now
-            LOG.info("getMaster attempt " + tries + " of " + numRetries +
+            LOG.info("getMaster attempt " + tries + " of " + numTries +
               " failed; no more retrying.", exceptionCaught);
             throw new MasterNotRunningException(exceptionCaught);
           }
@@ -1103,7 +1122,7 @@ public class HConnectionManager {
           return location;
         }
       }
-      int localNumRetries = retry ? numRetries : 1;
+      int localNumRetries = retry ? numTries : 1;
       // build the key of the meta region we should be looking for.
       // the extra 9's on the end are necessary to allow "exact" matches
       // without knowing the precise region names.
@@ -1112,7 +1131,7 @@ public class HConnectionManager {
       for (int tries = 0; true; tries++) {
         if (tries >= localNumRetries) {
           throw new NoServerForRegionException("Unable to find region for "
-            + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
+            + Bytes.toStringBinary(row) + " after " + numTries + " tries.");
         }
 
         HRegionLocation metaLocation = null;
@@ -1210,13 +1229,13 @@ public class HConnectionManager {
           if (e instanceof RemoteException) {
             e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
           }
-          if (tries < numRetries - 1) {
+          if (tries < numTries - 1) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("locateRegionInMeta parentTable=" +
                 Bytes.toString(parentTable) + ", metaLocation=" +
                 ((metaLocation == null)? "null": "{" + metaLocation + "}") +
                 ", attempt=" + tries + " of " +
-                this.numRetries + " failed; retrying after sleep of " +
+                this.numTries + " failed; retrying after sleep of " +
                 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
             }
           } else {
@@ -1969,6 +1988,8 @@ public class HConnectionManager {
       private final List<Action<R>> toReplay;
       private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
         inProgress;
+
+      private ServerErrorTracker errorsByServer = null;
       private int curNumRetries;
 
       // Notified when a tasks is done
@@ -1994,10 +2015,11 @@ public class HConnectionManager {
        * Group a list of actions per region servers, and send them. The created MultiActions
are
        *  added to the inProgress list.
        * @param actionsList
-       * @param sleepTime - sleep time before actually executing the actions. Can be zero.
+       * @param isRetry Whether we are retrying these actions. If yes, backoff
+       *                time may be applied before new requests.
        * @throws IOException - if we can't locate a region after multiple retries.
        */
-      private void submit(List<Action<R>> actionsList, final long sleepTime)
throws IOException {
+      private void submit(List<Action<R>> actionsList, final boolean isRetry)
throws IOException {
         // group per location => regions server
         final Map<HRegionLocation, MultiAction<R>> actionsByServer =
           new HashMap<HRegionLocation, MultiAction<R>>();
@@ -2022,15 +2044,25 @@ public class HConnectionManager {
 
         // Send the queries and add them to the inProgress list
         for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet())
{
+          long backoffTime = 0;
+          if (isRetry) {
+            if (hci.isUsingServerTrackerForRetries()) {
+              assert this.errorsByServer != null;
+              backoffTime = this.errorsByServer.calculateBackoffTime(e.getKey(), hci.pause);
+            } else {
+              // curNumRetries starts with one, subtract to start from 0.
+              backoffTime = ConnectionUtils.getPauseTime(hci.pause, curNumRetries - 1);
+            }
+          }
           Callable<MultiResponse> callable =
-            createDelayedCallable(sleepTime, e.getKey(), e.getValue());
-          if (LOG.isTraceEnabled() && (sleepTime > 0)) {
+            createDelayedCallable(backoffTime, e.getKey(), e.getValue());
+          if (LOG.isTraceEnabled() && isRetry) {
             StringBuilder sb = new StringBuilder();
             for (Action<R> action : e.getValue().allActions()) {
               sb.append(Bytes.toStringBinary(action.getAction().getRow())).append(';');
             }
-            LOG.trace("Sending requests to [" + e.getKey().getHostnamePort()
-              + "] with delay of [" + sleepTime + "] for rows [" + sb.toString() + "]");
+            LOG.trace("Will retry requests to [" + e.getKey().getHostnamePort()
+              + "] after delay of [" + backoffTime + "] for rows [" + sb.toString() + "]");
           }
           Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
p =
             new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
@@ -2044,9 +2076,7 @@ public class HConnectionManager {
       * @throws IOException
       */
       private void doRetry() throws IOException{
-        // curNumRetries at this point is 1 or more; decrement to start from 0.
-        final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries
- 1);
-        submit(this.toReplay, sleepTime);
+        submit(this.toReplay, true);
         this.toReplay.clear();
       }
 
@@ -2085,7 +2115,7 @@ public class HConnectionManager {
         }
 
         // execute the actions. We will analyze and resubmit the actions in a 'while' loop.
-        submit(listActions, 0);
+        submit(listActions, false);
 
         // LastRetry is true if, either:
         //  we had an exception 'DoNotRetry'
@@ -2094,7 +2124,7 @@ public class HConnectionManager {
         boolean lastRetry = false;
         // despite its name numRetries means number of tries. So if numRetries == 1 it means
we
         //  won't retry. And we compare vs. 2 in case someone set it to zero.
-        boolean noRetry = (hci.numRetries < 2);
+        boolean noRetry = (hci.numTries < 2);
 
         // Analyze and resubmit until all actions are done successfully or failed after numRetries
         while (!this.inProgress.isEmpty()) {
@@ -2112,7 +2142,7 @@ public class HConnectionManager {
           } catch (ExecutionException e) {
             exception = e;
           }
-
+          HRegionLocation location = currentTask.getSecond();
           // Error case: no result at all for this multi action. We need to redo all actions
           if (responses == null) {
             for (List<Action<R>> actions : currentTask.getFirst().actions.values())
{
@@ -2120,14 +2150,14 @@ public class HConnectionManager {
                 Row row = action.getAction();
                 // Do not use the exception for updating cache because it might be coming
from
                 // any of the regions in the MultiAction.
-                hci.updateCachedLocations(tableName, row, null, currentTask.getSecond());
+                hci.updateCachedLocations(tableName, row, null, location);
                 if (noRetry) {
-                  errors.add(exception, row, currentTask);
+                  errors.add(exception, row, location);
                 } else {
                   if (isTraceEnabled) {
-                    retriedErrors.add(exception, row, currentTask);
+                    retriedErrors.add(exception, row, location);
                   }
-                  lastRetry = addToReplay(nbRetries, action);
+                  lastRetry = addToReplay(nbRetries, action, location);
                 }
               }
             }
@@ -2146,14 +2176,14 @@ public class HConnectionManager {
                 // Failure: retry if it's make sense else update the errors lists
                 if (result == null || result instanceof Throwable) {
                   Row row = correspondingAction.getAction();
-                  hci.updateCachedLocations(this.tableName, row, result, currentTask.getSecond());
+                  hci.updateCachedLocations(this.tableName, row, result, location);
                   if (result instanceof DoNotRetryIOException || noRetry) {
-                    errors.add((Exception)result, row, currentTask);
+                    errors.add((Exception)result, row, location);
                   } else {
                     if (isTraceEnabled) {
-                      retriedErrors.add((Exception)result, row, currentTask);
+                      retriedErrors.add((Exception)result, row, location);
                     }
-                    lastRetry = addToReplay(nbRetries, correspondingAction);
+                    lastRetry = addToReplay(nbRetries, correspondingAction, location);
                   }
                 } else // success
                   if (callback != null) {
@@ -2186,11 +2216,10 @@ public class HConnectionManager {
         private List<Row> actions = new ArrayList<Row>();
         private List<String> addresses = new ArrayList<String>();
 
-        public void add(Exception ex, Row row,
-          Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
obj) {
+        public void add(Exception ex, Row row, HRegionLocation location) {
           exceptions.add(ex);
           actions.add(row);
-          addresses.add(obj.getSecond().getHostnamePort());
+          addresses.add(location.getHostnamePort());
         }
 
         public void rethrowIfAny() throws RetriesExhaustedWithDetailsException {
@@ -2219,17 +2248,24 @@ public class HConnectionManager {
        * Put the action that has to be retried in the Replay list.
        * @return true if we're out of numRetries and it's the last retry.
        */
-      private boolean addToReplay(int[] nbRetries, Action<R> action) {
+      private boolean addToReplay(int[] nbRetries, Action<R> action, HRegionLocation
source) {
         this.toReplay.add(action);
         nbRetries[action.getOriginalIndex()]++;
         if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
           this.curNumRetries = nbRetries[action.getOriginalIndex()];
         }
-        // numRetries means number of tries, while curNumRetries means current number of
retries. So
-        //  we need to add 1 to make them comparable. And as we look for the last try we
compare
-        //  with '>=' and no '>'. And we need curNumRetries to means what it says as
we don't want
-        //  to initialize it to 1.
-        return ( (this.curNumRetries +1) >= hci.numRetries);
+        if (hci.isUsingServerTrackerForRetries()) {
+          if (this.errorsByServer == null) {
+            this.errorsByServer = hci.createServerErrorTracker();
+          }
+          this.errorsByServer.reportServerError(source);
+          return !this.errorsByServer.canRetryMore();
+        } else {
+          // We need to add 1 to make tries and retries comparable. And as we look for
+          // the last try we compare with '>=' and not '>'. And we need curNumRetries
+          // to means what it says as we don't want to initialize it to 1.
+          return ((this.curNumRetries + 1) >= hci.numTries);
+        }
       }
 
       /**
@@ -2521,8 +2557,102 @@ public class HConnectionManager {
     void setRpcEngine(RpcClientEngine engine) {
       this.rpcEngine = engine;
     }
-  }
 
+    /**
+     * The record of errors for servers. Visible for testing.
+     */
+    @VisibleForTesting
+    static class ServerErrorTracker {
+      private final Map<HRegionLocation, ServerErrors> errorsByServer =
+          new HashMap<HRegionLocation, ServerErrors>();
+      private long canRetryUntil = 0;
+
+      public ServerErrorTracker(long timeout) {
+        LOG.info("Server tracker timeout is " + timeout + "ms");
+        this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
+      }
+
+      boolean canRetryMore() {
+        return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
+      }
+
+      /**
+       * Calculates the back-off time for a retrying request to a particular server.
+       * This is here, and package private, for testing (no good way to get at it).
+       * @param server The server in question.
+       * @param basePause The default hci pause.
+       * @return The time to wait before sending next request.
+       */
+      long calculateBackoffTime(HRegionLocation server, long basePause) {
+        long result = 0;
+        ServerErrors errorStats = errorsByServer.get(server);
+        if (errorStats != null) {
+          result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
+          // Adjust by the time we already waited since last talking to this server.
+          long now = EnvironmentEdgeManager.currentTimeMillis();
+          long timeSinceLastError = now - errorStats.getLastErrorTime();
+          if (timeSinceLastError > 0) {
+            result = Math.max(0, result - timeSinceLastError);
+          }
+          // Finally, see if the backoff time overshoots the timeout.
+          if (result > 0 && (now + result > this.canRetryUntil)) {
+            result = Math.max(0, this.canRetryUntil - now);
+          }
+        }
+        return result;
+      }
+
+      /**
+       * Reports that there was an error on the server to do whatever bean-counting necessary.
+       * This is here, and package private, for testing (no good way to get at it).
+       * @param server The server in question.
+       */
+      void reportServerError(HRegionLocation server) {
+        ServerErrors errors = errorsByServer.get(server);
+        if (errors != null) {
+          errors.addError();
+        } else {
+          errorsByServer.put(server, new ServerErrors());
+        }
+      }
+
+      /**
+       * The record of errors for a server.
+       */
+      private static class ServerErrors {
+        public long lastErrorTime;
+        public int retries;
+
+        public ServerErrors() {
+          this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
+          this.retries = 0;
+        }
+
+        public void addError() {
+          this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
+          ++this.retries;
+        }
+
+        public long getLastErrorTime() {
+          return this.lastErrorTime;
+        }
+      }
+    }
+
+    public boolean isUsingServerTrackerForRetries() {
+      return this.useServerTrackerForRetries;
+    }
+    /**
+     * Creates the server error tracker to use inside process.
+     * Currently, to preserve the main assumption about current retries, and to work well
with
+     * the retry-limit-based calculation, the calculation is local per Process object.
+     * We may benefit from connection-wide tracking of server errors.
+     * @return ServerErrorTracker to use.
+     */
+    ServerErrorTracker createServerErrorTracker() {
+      return new ServerErrorTracker(this.serverTrackerTimeout);
+    }
+  }
   /**
    * Set the number of retries to use serverside when trying to communicate
    * with another server over {@link HConnection}.  Used updating catalog

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java?rev=1464798&r1=1464797&r2=1464798&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
(original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
Fri Apr  5 00:40:21 2013
@@ -24,6 +24,7 @@ import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LoadTestTool;
 
@@ -42,9 +43,9 @@ public abstract class IngestIntegrationT
   protected HBaseCluster cluster;
   private LoadTestTool loadTool;
 
-  protected void setUp(int numSlavesBase) throws Exception {
+  protected void setUp(int numSlavesBase, Configuration conf) throws Exception {
     tableName = this.getClass().getSimpleName();
-    util = new IntegrationTestingUtility();
+    util = (conf == null) ? new IntegrationTestingUtility() : new IntegrationTestingUtility(conf);
     LOG.info("Initializing cluster with " + numSlavesBase + " servers");
     util.initializeCluster(numSlavesBase);
     LOG.info("Done initializing cluster");
@@ -58,6 +59,10 @@ public abstract class IngestIntegrationT
     Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
   }
 
+  protected void setUp(int numSlavesBase) throws Exception {
+    setUp(numSlavesBase, null);
+  }
+
   protected void tearDown() throws Exception {
     LOG.info("Restoring the cluster");
     util.restoreCluster();

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java?rev=1464798&r1=1464797&r2=1464798&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java
(original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java
Fri Apr  5 00:40:21 2013
@@ -27,6 +27,8 @@ import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChaosMonkey;
 import org.apache.hadoop.hbase.util.Pair;
@@ -100,7 +102,9 @@ public class IntegrationTestRebalanceAnd
   @Before
   @SuppressWarnings("unchecked")
   public void setUp() throws Exception {
-    super.setUp(NUM_SLAVES_BASE);
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(HConnectionManager.RETRIES_BY_SERVER, "true");
+    super.setUp(NUM_SLAVES_BASE, conf);
 
     ChaosMonkey.Policy chaosPolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
       CHAOS_EVERY_MS, new UnbalanceKillAndRebalanceAction());

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1464798&r1=1464797&r2=1464798&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Fri
Apr  5 00:40:21 2013
@@ -18,10 +18,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -42,6 +39,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
@@ -55,6 +53,8 @@ import org.apache.hadoop.hbase.master.HM
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.AfterClass;
@@ -304,13 +304,13 @@ public class TestHCM {
 
     // Hijack the number of retry to fail immediately instead of retrying: there will be
no new
     //  connection to the master
-    Field numRetries = conn.getClass().getDeclaredField("numRetries");
-    numRetries.setAccessible(true);
+    Field numTries = conn.getClass().getDeclaredField("numTries");
+    numTries.setAccessible(true);
     Field modifiersField = Field.class.getDeclaredField("modifiers");
     modifiersField.setAccessible(true);
-    modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
-    final int prevNumRetriesVal = (Integer)numRetries.get(conn);
-    numRetries.set(conn, 1);
+    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
+    final int prevNumRetriesVal = (Integer)numTries.get(conn);
+    numTries.set(conn, 1);
 
     // We do a put and expect the cache to be updated, even if we don't retry
     LOG.info("Put starting");
@@ -379,7 +379,7 @@ public class TestHCM {
       "Previous server was "+destServer.getServerName().getHostAndPort(),
       curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
 
-    numRetries.set(conn, prevNumRetriesVal);
+    numTries.set(conn, prevNumRetriesVal);
     table.close();
   }
 
@@ -705,13 +705,13 @@ public class TestHCM {
         conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
 
     // Hijack the number of retry to fail after 2 tries
-    Field numRetries = conn.getClass().getDeclaredField("numRetries");
-    numRetries.setAccessible(true);
+    Field numTries = conn.getClass().getDeclaredField("numTries");
+    numTries.setAccessible(true);
     Field modifiersField = Field.class.getDeclaredField("modifiers");
     modifiersField.setAccessible(true);
-    modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
-    final int prevNumRetriesVal = (Integer)numRetries.get(conn);
-    numRetries.set(conn, 2);
+    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
+    final int prevNumRetriesVal = (Integer)numTries.get(conn);
+    numTries.set(conn, 2);
 
     Put put3 = new Put(ROW_X);
     put3.add(FAM_NAM, ROW_X, ROW_X);
@@ -722,10 +722,83 @@ public class TestHCM {
     table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
                                                  // second we get RegionMovedException.
 
-    numRetries.set(conn, prevNumRetriesVal);
+    numTries.set(conn, prevNumRetriesVal);
     table.close();
     conn.close();
   }
 
+  @Test
+  public void testErrorBackoffTimeCalculation() throws Exception {
+    final long ANY_PAUSE = 1000;
+    HRegionInfo ri = new HRegionInfo(TABLE_NAME);
+    HRegionLocation location = new HRegionLocation(ri, new ServerName("127.0.0.1", 1, 0));
+    HRegionLocation diffLocation = new HRegionLocation(ri, new ServerName("127.0.0.1", 2,
0));
+
+    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(timeMachine);
+    try {
+      long timeBase = timeMachine.currentTimeMillis();
+      long largeAmountOfTime = ANY_PAUSE * 1000;
+      HConnectionImplementation.ServerErrorTracker tracker =
+          new HConnectionImplementation.ServerErrorTracker(largeAmountOfTime);
+
+      // The default backoff is 0.
+      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
+
+      // Check some backoff values from HConstants sequence.
+      tracker.reportServerError(location);
+      assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE));
+      tracker.reportServerError(location);
+      tracker.reportServerError(location);
+      tracker.reportServerError(location);
+      assertEqualsWithJitter(ANY_PAUSE * 2, tracker.calculateBackoffTime(location, ANY_PAUSE));
+
+      // All of this shouldn't affect backoff for different location.
+
+      assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
+      tracker.reportServerError(diffLocation);
+      assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
+
+      // But should still work for a different region in the same location.
+      HRegionInfo ri2 = new HRegionInfo(TABLE_NAME2);
+      HRegionLocation diffRegion = new HRegionLocation(ri2, location.getServerName());
+      assertEqualsWithJitter(ANY_PAUSE * 2, tracker.calculateBackoffTime(diffRegion, ANY_PAUSE));
+
+      // Check with different base.
+      assertEqualsWithJitter(ANY_PAUSE * 4,
+          tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
+
+      // See that time from last error is taken into account. Time shift is applied after
jitter,
+      // so pass the original expected backoff as the base for jitter.
+      long timeShift = (long)(ANY_PAUSE * 0.5);
+      timeMachine.setValue(timeBase + timeShift);
+      assertEqualsWithJitter(ANY_PAUSE * 2 - timeShift,
+        tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2);
+
+      // However we should not go into negative.
+      timeMachine.setValue(timeBase + ANY_PAUSE * 100);
+      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
+
+      // We also should not go over the boundary; last retry would be on it.
+      long timeLeft = (long)(ANY_PAUSE * 0.5);
+      timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
+      assertTrue(tracker.canRetryMore());
+      tracker.reportServerError(location);
+      assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
+      timeMachine.setValue(timeBase + largeAmountOfTime);
+      assertFalse(tracker.canRetryMore());
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  private static void assertEqualsWithJitter(long expected, long actual) {
+    assertEqualsWithJitter(expected, actual, expected);
+  }
+
+  private static void assertEqualsWithJitter(long expected, long actual, long jitterBase)
{
+    assertTrue("Value not within jitter: " + expected + " vs " + actual,
+        Math.abs(actual - expected) <= (0.01f * jitterBase));
+  }
 }
 



Mime
View raw message