hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1536871 - in /hbase/branches/0.96: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/test/java/org/apache/hadoop/hbase/client/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apa...
Date Tue, 29 Oct 2013 19:50:08 GMT
Author: nkeywal
Date: Tue Oct 29 19:50:08 2013
New Revision: 1536871

URL: http://svn.apache.org/r1536871
Log:
HBASE-9843 Various fixes in client code

Modified:
    hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
    hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java

Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
(original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
Tue Oct 29 19:50:08 2013
@@ -87,7 +87,9 @@ import org.cloudera.htrace.Trace;
  */
 class AsyncProcess<CResult> {
   private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
-  private final static int START_LOG_ERRORS_CNT = 4;
+  protected static final AtomicLong COUNTER = new AtomicLong();
+  protected final long id;
+  private final int startLogErrorsCnt;
   protected final HConnection hConnection;
   protected final TableName tableName;
   protected final ExecutorService pool;
@@ -97,6 +99,7 @@ class AsyncProcess<CResult> {
   protected final AtomicBoolean hasError = new AtomicBoolean(false);
   protected final AtomicLong tasksSent = new AtomicLong(0);
   protected final AtomicLong tasksDone = new AtomicLong(0);
+  protected final AtomicLong retriesCnt = new AtomicLong(0);
   protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
       new ConcurrentHashMap<String, AtomicInteger>();
   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
@@ -121,7 +124,6 @@ class AsyncProcess<CResult> {
   protected final int maxConcurrentTasksPerServer;
   protected final long pause;
   protected int numTries;
-  protected final boolean useServerTrackerForRetries;
   protected int serverTrackerTimeout;
   protected RpcRetryingCallerFactory rpcCallerFactory;
 
@@ -205,6 +207,8 @@ class AsyncProcess<CResult> {
     this.pool = pool;
     this.callback = callback;
 
+    this.id = COUNTER.incrementAndGet();
+
     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
@@ -217,6 +221,11 @@ class AsyncProcess<CResult> {
     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
 
+    // A few failure is fine: region moved, then is not opened, then is overloaded. We try
+    //  to have an acceptable heuristic for the number of errors we don't log.
+    //  9 was chosen because we wait for 1s at this stage.
+    this.startLogErrorsCnt = conf.getInt("hbase.client.start.log.errors.counter", 9);
+
     if (this.maxTotalConcurrentTasks <= 0) {
       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
     }
@@ -229,23 +238,19 @@ class AsyncProcess<CResult> {
           maxConcurrentTasksPerRegion);
     }
 
-    this.useServerTrackerForRetries =
-        conf.getBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
-
-    if (this.useServerTrackerForRetries) {
-      // Server tracker allows us to do faster, and yet useful (hopefully), retries.
-      // However, if we are too useful, we might fail very quickly due to retry count limit.
-      // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
-      // retry time if normal retries were used. Then we will retry until this time runs
out.
-      // If we keep hitting one server, the net effect will be the incremental backoff, and
-      // essentially the same number of retries as planned. If we have to do faster retries,
-      // we will do more retries in aggregate, but the user will be none the wiser.
-      this.serverTrackerTimeout = 0;
-      for (int i = 0; i < this.numTries; ++i) {
-        serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
-      }
+    // Server tracker allows us to do faster, and yet useful (hopefully), retries.
+    // However, if we are too useful, we might fail very quickly due to retry count limit.
+    // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
+    // retry time if normal retries were used. Then we will retry until this time runs out.
+    // If we keep hitting one server, the net effect will be the incremental backoff, and
+    // essentially the same number of retries as planned. If we have to do faster retries,
+    // we will do more retries in aggregate, but the user will be none the wiser.
+    this.serverTrackerTimeout = 0;
+    for (int i = 0; i < this.numTries; ++i) {
+      serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
     }
 
+
     this.rpcCallerFactory = rpcCaller;
   }
 
@@ -291,7 +296,7 @@ class AsyncProcess<CResult> {
       Iterator<? extends Row> it = rows.iterator();
       while (it.hasNext()) {
         Row r = it.next();
-        HRegionLocation loc = findDestLocation(r, 1, posInList);
+        HRegionLocation loc = findDestLocation(r, posInList);
 
         if (loc == null) { // loc is null if there is an error such as meta not available.
           it.remove();
@@ -332,18 +337,17 @@ class AsyncProcess<CResult> {
    * Find the destination.
    *
    * @param row          the row
-   * @param numAttempt   the num attempt
    * @param posInList    the position in the list
    * @return the destination. Null if we couldn't find it.
    */
-  private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
-    if (row == null) throw new IllegalArgumentException("row cannot be null");
+  private HRegionLocation findDestLocation(Row row, int posInList) {
+    if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
     HRegionLocation loc = null;
     IOException locationException = null;
     try {
       loc = hConnection.locateRegion(this.tableName, row.getRow());
       if (loc == null) {
-        locationException = new IOException("No location found, aborting submit for" +
+        locationException = new IOException("#" + id + ", no location found, aborting submit
for" +
             " tableName=" + tableName +
             " rowkey=" + Arrays.toString(row.getRow()));
       }
@@ -353,7 +357,7 @@ class AsyncProcess<CResult> {
     if (locationException != null) {
       // There are multiple retries in locateRegion already. No need to add new.
       // We can't continue with this row, hence it's the last retry.
-      manageError(numAttempt, posInList, row, false, locationException, null);
+      manageError(posInList, row, false, locationException, null);
       return null;
     }
 
@@ -460,12 +464,17 @@ class AsyncProcess<CResult> {
   private void submit(List<Action<Row>> initialActions,
                       List<Action<Row>> currentActions, int numAttempt,
                       final HConnectionManager.ServerErrorTracker errorsByServer) {
+
+    if (numAttempt > 1){
+      retriesCnt.incrementAndGet();
+    }
+
     // group per location => regions server
     final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
         new HashMap<HRegionLocation, MultiAction<Row>>();
 
     for (Action<Row> action : currentActions) {
-      HRegionLocation loc = findDestLocation(action.getAction(), 1, action.getOriginalIndex());
+      HRegionLocation loc = findDestLocation(action.getAction(), action.getOriginalIndex());
       if (loc != null) {
         addAction(loc, action, actionsByServer);
       }
@@ -503,7 +512,8 @@ class AsyncProcess<CResult> {
             try {
               res = createCaller(callable).callWithoutRetries(callable);
             } catch (IOException e) {
-              LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt
+
+              LOG.warn("#" + id + ", call to " + loc.getServerName() +
+                  " failed numAttempt=" + numAttempt +
                 ", resubmitting all since not sure where we are at", e);
               resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
               return;
@@ -522,7 +532,7 @@ class AsyncProcess<CResult> {
         // This should never happen. But as the pool is provided by the end user, let's secure
         //  this a little.
         decTaskCounters(multiAction.getRegions(), loc.getServerName());
-        LOG.warn("The task was rejected by the pool. This is unexpected." +
+        LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
             " Server is " + loc.getServerName(), ree);
         // We're likely to fail again, but this will increment the attempt counter, so it
will
         //  finish.
@@ -551,7 +561,6 @@ class AsyncProcess<CResult> {
   /**
    * Check that we can retry acts accordingly: logs, set the error status, call the callbacks.
    *
-   * @param numAttempt    the number of this attempt
    * @param originalIndex the position in the list sent
    * @param row           the row
    * @param canRetry      if false, we won't retry whatever the settings.
@@ -559,13 +568,10 @@ class AsyncProcess<CResult> {
    * @param location      the location, if any (can be null)
    * @return true if the action can be retried, false otherwise.
    */
-  private boolean manageError(int numAttempt, int originalIndex, Row row, boolean canRetry,
+  private boolean manageError(int originalIndex, Row row, boolean canRetry,
                               Throwable throwable, HRegionLocation location) {
-    if (canRetry) {
-      if (numAttempt >= numTries ||
-          (throwable != null && throwable instanceof DoNotRetryIOException)) {
-        canRetry = false;
-      }
+    if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException)
{
+      canRetry = false;
     }
     byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
 
@@ -608,15 +614,14 @@ class AsyncProcess<CResult> {
     List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
     for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet())
{
       for (Action<Row> action : e.getValue()) {
-        if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),
-            true, t, location)) {
+        if (manageError(action.getOriginalIndex(), action.getAction(), true, t, location))
{
           toReplay.add(action);
         }
       }
     }
 
     if (toReplay.isEmpty()) {
-      LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
+      LOG.warn("#" + id + ", attempt #" + numAttempt + "/" + numTries + " failed for all
" +
         initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
     } else {
       submit(initialActions, toReplay, numAttempt, errorsByServer);
@@ -628,7 +633,7 @@ class AsyncProcess<CResult> {
    *
    * @param initialActions - the whole action list
    * @param rsActions      - the actions for this location
-   * @param location       - the location
+   * @param location       - the location. It's used as a server name.
    * @param responses      - the response, if any
    * @param numAttempt     - the attempt
    */
@@ -638,8 +643,8 @@ class AsyncProcess<CResult> {
                                   HConnectionManager.ServerErrorTracker errorsByServer) {
 
     if (responses == null) {
-      LOG.info("Attempt #" + numAttempt + "/" + numTries + " failed all ops, trying resubmit,"
+
-        location);
+      LOG.info("#" + id + ", attempt #" + numAttempt + "/" + numTries +
+          " failed all ops, trying resubmit," + location);
       resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
       return;
     }
@@ -670,14 +675,15 @@ class AsyncProcess<CResult> {
           failureCount++;
           if (!regionFailureRegistered) { // We're doing this once per location.
             regionFailureRegistered= true;
+            // The location here is used as a server name.
             hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
-            if (errorsByServer != null) {
+            if (failureCount == 1) {
               errorsByServer.reportServerError(location);
-              canRetry = errorsByServer.canRetryMore();
+              canRetry = errorsByServer.canRetryMore(numAttempt);
             }
           }
 
-          if (manageError(numAttempt, correspondingAction.getOriginalIndex(), row, canRetry,
+          if (manageError(correspondingAction.getOriginalIndex(), row, canRetry,
               throwable, location)) {
             toReplay.add(correspondingAction);
           }
@@ -694,21 +700,24 @@ class AsyncProcess<CResult> {
     }
 
     if (!toReplay.isEmpty()) {
-      long backOffTime = (errorsByServer != null ?
-          errorsByServer.calculateBackoffTime(location, pause) :
-          ConnectionUtils.getPauseTime(pause, numAttempt));
-      if (numAttempt > START_LOG_ERRORS_CNT && LOG.isDebugEnabled()) {
+      // We have two contradicting needs here:
+      //  1) We want to get the new location after having slept, as it may change.
+      //  2) We want to take into account the location when calculating the sleep time.
+      // It should be possible to have some heuristics to take the right decision. Short
term,
+      //  we go for one.
+      long backOffTime = errorsByServer.calculateBackoffTime(location, pause);
+      if (numAttempt > startLogErrorsCnt) {
         // We use this value to have some logs when we have multiple failures, but not too
many
         //  logs, as errors are to be expected when a region moves, splits and so on
-        LOG.debug("Attempt #" + numAttempt + "/" + numTries + " failed " + failureCount +
-            " ops , resubmitting " + toReplay.size() + ", " + location + ", last exception
was: " +
-            (throwable == null ? "null" : throwable.getMessage()) +
-            ", sleeping " + backOffTime + "ms");
+        LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
+            location.getServerName(), throwable, backOffTime, true,
+            errorsByServer.getStartTrackingTime()));
       }
+
       try {
         Thread.sleep(backOffTime);
       } catch (InterruptedException e) {
-        LOG.warn("Not sent: " + toReplay.size() + " operations, " + location, e);
+        LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + location,
e);
         Thread.interrupted();
         return;
       }
@@ -717,16 +726,46 @@ class AsyncProcess<CResult> {
     } else {
       if (failureCount != 0) {
         // We have a failure but nothing to retry. We're done, it's a final failure..
-        LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount
+
-            " ops on " + location.getServerName() + " NOT resubmitting. " + location);
-      } else if (numAttempt > START_LOG_ERRORS_CNT + 1 && LOG.isDebugEnabled())
{
+        LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
+            location.getServerName(), throwable, -1, false,
+            errorsByServer.getStartTrackingTime()));
+      } else if (numAttempt > startLogErrorsCnt + 1) {
         // The operation was successful, but needed several attempts. Let's log this.
-        LOG.debug("Attempt #" + numAttempt + "/" + numTries + " finally suceeded, size="
+
-          toReplay.size());
+        LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
+            location.getServerName(), throwable, -1, false,
+            errorsByServer.getStartTrackingTime()));
       }
     }
   }
 
+  private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
+                           Throwable error, long backOffTime, boolean willRetry, String startTime){
+    StringBuilder sb = new StringBuilder();
+
+    sb.append("#").append(id).append(", table=").append(tableName).
+        append(", Attempt #").append(numAttempt).append("/").append(numTries).append(" ");
+
+    if (failureCount > 0 || error != null){
+      sb.append("failed ").append(failureCount).append(" ops").append(", last exception was:
").
+          append(error == null ? "null" : error.getMessage());
+    }else {
+      sb.append("SUCCEEDED");
+    }
+
+    sb.append(" on server ").append(sn);
+
+    sb.append(", tracking started at ").append(startTime);
+
+    if (willRetry) {
+      sb.append(" - retrying after sleeping for ").append(backOffTime).append(" ms").
+          append(", will replay ").append(replaySize).append(" ops.");
+    } else if (failureCount > 0) {
+      sb.append(" - FAILED, NOT RETRYING ANYMORE");
+    }
+
+    return sb.toString();
+  }
+
   /**
    * Waits for another task to finish.
    * @param currentNumberOfTask - the number of task finished when calling the method.
@@ -738,7 +777,7 @@ class AsyncProcess<CResult> {
           this.tasksDone.wait(100);
         }
       } catch (InterruptedException e) {
-        throw new InterruptedIOException("Interrupted." +
+        throw new InterruptedIOException("#" + id + ", interrupted." +
             " currentNumberOfTask=" + currentNumberOfTask +
             ",  tableName=" + tableName + ", tasksDone=" + tasksDone.get());
       }
@@ -756,9 +795,10 @@ class AsyncProcess<CResult> {
       long now = EnvironmentEdgeManager.currentTimeMillis();
       if (now > lastLog + 10000) {
         lastLog = now;
-        LOG.info(": Waiting for the global number of running tasks to be equals or less than
"
+        LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
             + max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() +
-            ", currentTasksDone=" + currentTasksDone + ", tableName=" + tableName);
+            ", currentTasksDone=" + currentTasksDone + ", retries=" + retriesCnt.get() +
+            " hasError=" + hasError() + ", tableName=" + tableName);
       }
       waitForNextTaskDone(currentTasksDone);
       currentTasksDone = this.tasksDone.get();
@@ -848,10 +888,6 @@ class AsyncProcess<CResult> {
    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
    */
   protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
-    if (useServerTrackerForRetries){
-      return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout);
-    }else {
-      return null;
-    }
+    return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout, this.numTries);
   }
 }

Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Tue Oct 29 19:50:08 2013
@@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.SocketException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -2637,15 +2638,23 @@ public class HConnectionManager {
     // We need a concurrent map here, as we could have multiple threads updating it in parallel.
     private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
         new ConcurrentHashMap<HRegionLocation, ServerErrors>();
-    private long canRetryUntil = 0;
+    private final long canRetryUntil;
+    private final int maxRetries;
+    private final String startTrackingTime;
 
-    public ServerErrorTracker(long timeout) {
-      LOG.trace("Server tracker timeout is " + timeout + "ms");
+    public ServerErrorTracker(long timeout, int maxRetries) {
+      this.maxRetries = maxRetries;
       this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
+      this.startTrackingTime = new Date().toString();
     }
 
-    boolean canRetryMore() {
-      return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
+    /**
+     * We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
+     */
+    boolean canRetryMore(int numRetry) {
+      // If there is a single try we must not take into account the time.
+      return numRetry < maxRetries || (maxRetries > 1 &&
+          EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil);
     }
 
     /**
@@ -2656,20 +2665,12 @@ public class HConnectionManager {
      * @return The time to wait before sending next request.
      */
     long calculateBackoffTime(HRegionLocation server, long basePause) {
-      long result = 0;
+      long result;
       ServerErrors errorStats = errorsByServer.get(server);
       if (errorStats != null) {
-        result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
-        // Adjust by the time we already waited since last talking to this server.
-        long now = EnvironmentEdgeManager.currentTimeMillis();
-        long timeSinceLastError = now - errorStats.getLastErrorTime();
-        if (timeSinceLastError > 0) {
-          result = Math.max(0, result - timeSinceLastError);
-        }
-        // Finally, see if the backoff time overshoots the timeout.
-        if (result > 0 && (now + result > this.canRetryUntil)) {
-          result = Math.max(0, this.canRetryUntil - now);
-        }
+        result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
+      } else {
+        result = 0; // yes, if the server is not in our list we don't wait before retrying.
       }
       return result;
     }
@@ -2684,29 +2685,25 @@ public class HConnectionManager {
       if (errors != null) {
         errors.addError();
       } else {
-        errorsByServer.put(server, new ServerErrors());
+        errors = errorsByServer.putIfAbsent(server, new ServerErrors());
+        if (errors != null){
+          errors.addError();
+        }
       }
     }
 
+    String getStartTrackingTime() {
+      return startTrackingTime;
+    }
+
     /**
      * The record of errors for a server.
      */
     private static class ServerErrors {
-      public long lastErrorTime;
-      public int retries;
-
-      public ServerErrors() {
-        this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
-        this.retries = 0;
-      }
+      public final AtomicInteger retries = new AtomicInteger(0);
 
       public void addError() {
-        this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
-        ++this.retries;
-      }
-
-      public long getLastErrorTime() {
-        return this.lastErrorTime;
+        retries.incrementAndGet();
       }
     }
   }

Modified: hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
(original)
+++ hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Tue Oct 29 19:50:08 2013
@@ -679,7 +679,7 @@ public class TestAsyncProcess {
     HTable ht = new HTable();
     Configuration configuration = new Configuration(conf);
     configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
-    configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
+    configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
     // set default writeBufferSize
     ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
 
@@ -688,24 +688,21 @@ public class TestAsyncProcess {
     ht.ap = new MyAsyncProcess<Object>(mci, null, configuration);
 
 
-    Assert.assertTrue(ht.ap.useServerTrackerForRetries);
     Assert.assertNotNull(ht.ap.createServerErrorTracker());
-    Assert.assertTrue(ht.ap.serverTrackerTimeout > 10000);
+    Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
     ht.ap.serverTrackerTimeout = 1;
 
-
     Put p = createPut(1, false);
     ht.setAutoFlush(false, false);
     ht.put(p);
 
-    long start = System.currentTimeMillis();
     try {
       ht.flushCommits();
       Assert.fail();
     } catch (RetriesExhaustedWithDetailsException expected) {
     }
-    // Checking that the ErrorsServers came into play and made us stop immediately
-    Assert.assertTrue((System.currentTimeMillis() - start) < 10000);
+    // Checking that the ErrorsServers came into play and didn't make us stop immediately
+    Assert.assertEquals(ht.ap.tasksSent.get(), 3);
   }
 
   /**
@@ -731,8 +728,7 @@ public class TestAsyncProcess {
     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
     ht.connection = con;
 
-      ht.batch(gets);
-
+    ht.batch(gets);
 
     Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
     Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());

Modified: hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
(original)
+++ hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Tue Oct 29 19:50:08 2013
@@ -510,9 +510,10 @@ public final class HConstants {
    * run out of array items.  Retries beyond this use the last number in the array.  So,
for
    * example, if hbase.client.pause is 1 second, and maximum retries count
    * hbase.client.retries.number is 10, we will retry at the following intervals:
-   * 1, 2, 3, 10, 100, 100, 100, 100, 100, 100.
+   * 1, 2, 3, 5, 10, 20, 40, 100, 100, 100.
+   * With 100ms, a back-off of 200 means 20s
    */
-  public static int RETRY_BACKOFF[] = { 1, 2, 3, 5, 10, 100 };
+  public static int RETRY_BACKOFF[] = { 1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200,
200 };
 
   public static final String REGION_IMPL = "hbase.hregion.impl";
 
@@ -589,7 +590,7 @@ public final class HConstants {
   /**
    * Default value of {@link #HBASE_CLIENT_MAX_PERSERVER_TASKS}.
    */
-  public static final int DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS = 5;
+  public static final int DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS = 2;
 
   /**
    * The maximum number of concurrent connections the client will maintain to a single

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
Tue Oct 29 19:50:08 2013
@@ -93,13 +93,13 @@ public class ClusterStatusPublisher exte
    * We want to limit the size of the protobuf message sent, do fit into a single packet.
    * a reasonable size for ip / ethernet is less than 1Kb.
    */
-  public static int MAX_SERVER_PER_MESSAGE = 10;
+  public final static int MAX_SERVER_PER_MESSAGE = 10;
 
   /**
    * If a server dies, we're sending the information multiple times in case a receiver misses
the
    * message.
    */
-  public static int NB_SEND = 5;
+  public final static int NB_SEND = 5;
 
   public ClusterStatusPublisher(HMaster master, Configuration conf,
                                 Class<? extends Publisher> publisherClass)

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Tue Oct 29 19:50:08 2013
@@ -2541,7 +2541,13 @@ public class HRegion implements HeapSize
 
     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
       requestFlush();
-      throw new RegionTooBusyException("above memstore limit");
+      throw new RegionTooBusyException("Above memstore limit, " +
+          "regionName=" + (this.getRegionInfo() == null ? "unknown" :
+          this.getRegionInfo().getRegionNameAsString()) +
+          ", server=" + (this.getRegionServerServices() == null ? "unknown" :
+          this.getRegionServerServices().getServerName()) +
+          ", memstoreSize=" + memstoreSize.get() +
+          ", blockingMemStoreSize=" + blockingMemStoreSize);
     }
   }
 
@@ -5381,10 +5387,14 @@ public class HRegion implements HeapSize
       throws RegionTooBusyException, InterruptedIOException {
     try {
       final long waitTime = Math.min(maxBusyWaitDuration,
-        busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
+          busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
         throw new RegionTooBusyException(
-          "failed to get a lock in " + waitTime + "ms");
+            "failed to get a lock in " + waitTime + " ms. " +
+                "regionName=" + (this.getRegionInfo() == null ? "unknown" :
+                this.getRegionInfo().getRegionNameAsString()) +
+                ", server=" + (this.getRegionServerServices() == null ? "unknown" :
+                this.getRegionServerServices().getServerName()));
       }
     } catch (InterruptedException ie) {
       LOG.info("Interrupted while waiting for a lock");

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
(original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
Tue Oct 29 19:50:08 2013
@@ -870,7 +870,7 @@ public class TestHCM {
       long timeBase = timeMachine.currentTimeMillis();
       long largeAmountOfTime = ANY_PAUSE * 1000;
       HConnectionManager.ServerErrorTracker tracker =
-          new HConnectionManager.ServerErrorTracker(largeAmountOfTime);
+          new HConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
 
       // The default backoff is 0.
       assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
@@ -912,11 +912,11 @@ public class TestHCM {
       // We also should not go over the boundary; last retry would be on it.
       long timeLeft = (long)(ANY_PAUSE * 0.5);
       timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
-      assertTrue(tracker.canRetryMore());
+      assertTrue(tracker.canRetryMore(1));
       tracker.reportServerError(location);
       assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
       timeMachine.setValue(timeBase + largeAmountOfTime);
-      assertFalse(tracker.canRetryMore());
+      assertFalse(tracker.canRetryMore(1));
     } finally {
       EnvironmentEdgeManager.reset();
     }



Mime
View raw message