hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [30/49] git commit: HBASE-10794 multi-get should handle replica location missing from cache
Date Sat, 28 Jun 2014 00:31:16 GMT
HBASE-10794 multi-get should handle replica location missing from cache

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1586468 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/579f305b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/579f305b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/579f305b

Branch: refs/heads/master
Commit: 579f305bd0fb5673a82d3eeb74660db7a0d6ddd8
Parents: 61bce90
Author: sershe <sershe@unknown>
Authored: Thu Apr 10 20:57:59 2014 +0000
Committer: Enis Soztutar <enis@apache.org>
Committed: Fri Jun 27 16:39:39 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 392 ++++++++++++-------
 .../hadoop/hbase/client/ClusterConnection.java  |   6 -
 .../hadoop/hbase/client/ConnectionAdapter.java  |   5 -
 .../hadoop/hbase/client/ConnectionManager.java  |  14 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  17 +-
 .../hbase/client/CoprocessorHConnection.java    |   5 -
 .../hbase/client/HConnectionTestingUtility.java |   5 +-
 7 files changed, 273 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 3c6f5be..c184147 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -346,7 +347,15 @@ class AsyncProcess {
         Row r = it.next();
         HRegionLocation loc;
         try {
-          loc = findDestLocation(tableName, r, true).getDefaultRegionLocation();
+          if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
+          // Make sure we get 0-s replica.
+          RegionLocations locs = hConnection.locateRegion(
+              tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
+          if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null)
{
+            throw new IOException("#" + id + ", no location found, aborting submit for" +
+                " tableName=" + tableName + " rowkey=" + Arrays.toString(r.getRow()));
+          }
+          loc = locs.getDefaultRegionLocation();
         } catch (IOException ex) {
           locationErrors = new ArrayList<Exception>();
           locationErrorRows = new ArrayList<Integer>();
@@ -381,10 +390,11 @@ class AsyncProcess {
       for (int i = 0; i < locationErrors.size(); ++i) {
         int originalIndex = locationErrorRows.get(i);
         Row row = retainedActions.get(originalIndex).getAction();
-        ars.manageError(originalIndex, row, false, locationErrors.get(i), null);
+        ars.manageError(originalIndex, row,
+            Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
       }
     }
-    ars.sendMultiAction(actionsByServer, 1, null);
+    ars.sendMultiAction(actionsByServer, 1, null, false);
     return ars;
   }
 
@@ -411,24 +421,6 @@ class AsyncProcess {
   }
 
   /**
-   * Find the destination.
-   * @param tableName the requisite table.
-   * @param row the row
-   * @return the destination.
-   */
-  private RegionLocations findDestLocation(
-      TableName tableName, Row row, boolean checkPrimary) throws IOException {
-    if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
-    RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow());
-    if (loc == null
-        || (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation() == null)))
{
-      throw new IOException("#" + id + ", no location found, aborting submit for" +
-          " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow()));
-    }
-    return loc;
-  }
-
-  /**
    * Check if we should send new operations to this region or region server.
    * We're taking into account the past decision; if we have already accepted
    * operation on a given region, we accept all operations for this region.
@@ -585,17 +577,29 @@ class AsyncProcess {
         if (done) return; // Done within primary timeout
         Map<ServerName, MultiAction<Row>> actionsByServer =
             new HashMap<ServerName, MultiAction<Row>>();
+        List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
         if (replicaGetIndices == null) {
           for (int i = 0; i < results.length; ++i) {
-            addReplicaActions(i, actionsByServer);
+            addReplicaActions(i, actionsByServer, unknownLocActions);
           }
         } else {
           for (int i = 0; i < replicaGetIndices.length; ++i) {
-            addReplicaActions(replicaGetIndices[i], actionsByServer);
+            addReplicaActions(replicaGetIndices[i], actionsByServer, unknownLocActions);
+          }
+        }
+        if (!actionsByServer.isEmpty()) {
+          sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
+        }
+        if (!unknownLocActions.isEmpty()) {
+          actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
+          for (Action<Row> action : unknownLocActions) {
+            addReplicaActionsAgain(action, actionsByServer);
+          }
+          // Some actions may have completely failed, they are handled inside addAgain.
+          if (!actionsByServer.isEmpty()) {
+            sendMultiAction(actionsByServer, 1, null, true);
           }
         }
-        if (actionsByServer.isEmpty()) return; // Nothing to do - done or no replicas found.
-        sendMultiAction(actionsByServer, 1, null);
       }
 
       /**
@@ -603,33 +607,14 @@ class AsyncProcess {
        * @param index Index of the original action.
        * @param actionsByServer The map by server to add it to.
        */
-      private void addReplicaActions(
-          int index, Map<ServerName, MultiAction<Row>> actionsByServer) {
+      private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>>
actionsByServer,
+          List<Action<Row>> unknownReplicaActions) {
         if (results[index] != null) return; // opportunistic. Never goes from non-null to
null.
         Action<Row> action = initialActions.get(index);
-        RegionLocations loc = null;
-        try {
-          // For perf, we assume that this location coming from cache, since we just got
location
-          // from meta for the primary call. If it turns out to not be the case, we'd need
local
-          // cache since we want to keep as little time as possible before replica call.
-          loc = findDestLocation(tableName, action.getAction(), false);
-        } catch (IOException ex) {
-          manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
-          LOG.error("Cannot get location - no replica calls for some actions", ex);
-          return;
-        }
+        RegionLocations loc = findAllLocationsOrFail(action, true);
+        if (loc == null) return;
         HRegionLocation[] locs = loc.getRegionLocations();
-        int replicaCount = 0;
-        for (int i = 1; i < locs.length; ++i) {
-          replicaCount += (locs[i] != null) ? 1 : 0;
-        }
-        if (replicaCount == 0) {
-          // we could have got the replica back (if the server went down and the replica
moved)
-          try {
-            loc = hConnection.locateRegion(tableName, action.getAction().getRow(), false,
true);
-          } catch (IOException e) {
-            manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
-          }
+        if (locs.length == 1) {
           LOG.warn("No replicas found for " + action.getAction());
           return;
         }
@@ -639,13 +624,29 @@ class AsyncProcess {
           // but that would require additional synchronization w.r.t. returning to caller.
           if (results[index] != null) return;
           // We set the number of calls here. After that any path must call setResult/setError.
-          results[index] = new ReplicaResultState(replicaCount + 1);
+          // True even for replicas that are not found - if we refuse to send we MUST set
error.
+          results[index] = new ReplicaResultState(locs.length);
         }
         for (int i = 1; i < locs.length; ++i) {
-          if (locs[i] == null) continue;
-          addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
-              new Action<Row>(action, i), actionsByServer, nonceGroup);
+          Action<Row> replicaAction = new Action<Row>(action, i);
+          if (locs[i] != null) {
+            addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
+                replicaAction, actionsByServer, nonceGroup);
+          } else {
+            unknownReplicaActions.add(replicaAction);
+          }
+        }
+      }
+
+      private void addReplicaActionsAgain(
+          Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer)
{
+        if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
+          throw new AssertionError("Cannot have default replica here");
         }
+        HRegionLocation loc = getReplicaLocationOrFail(action);
+        if (loc == null) return;
+        addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
+            action, actionsByServer, nonceGroup);
       }
     }
 
@@ -797,22 +798,14 @@ class AsyncProcess {
      * @param numAttempt - the current numAttempt (first attempt is 1)
      */
     private void groupAndSendMultiAction(List<Action<Row>> currentActions, int
numAttempt) {
-      // group per location => regions server
-      final Map<ServerName, MultiAction<Row>> actionsByServer =
+      Map<ServerName, MultiAction<Row>> actionsByServer =
           new HashMap<ServerName, MultiAction<Row>>();
 
       boolean isReplica = false;
+      List<Action<Row>> unknownReplicaActions = null;
       for (Action<Row> action : currentActions) {
-        RegionLocations locs = null;
-        try {
-          locs = findDestLocation(tableName, action.getAction(), false);
-        } catch (IOException ex) {
-          // There are multiple retries in locateRegion already. No need to add new.
-          // We can't continue with this row, hence it's the last retry.
-          manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
-          continue;
-        }
-
+        RegionLocations locs = findAllLocationsOrFail(action, true);
+        if (locs == null) continue;
         boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
         if (isReplica && !isReplicaAction) {
           // This is the property of the current implementation, not a requirement.
@@ -821,34 +814,89 @@ class AsyncProcess {
         isReplica = isReplicaAction;
         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
         if (loc == null || loc.getServerName() == null) {
-          try {
-            locs = hConnection.locateRegion(tableName, action.getAction().getRow(), false,
true, action.getReplicaId());
-            loc = locs.getRegionLocation(action.getReplicaId());
-          } catch (IOException e) {
-            // There are multiple retries in locateRegion already. No need to add new.
-            // We can't continue with this row, hence it's the last retry.
-            manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
-            continue;
-          }
-          if (loc == null || loc.getServerName() == null) {
-            // On retry, we couldn't find location for some replica we saw before.
-            String str = "Cannot find location for replica " + action.getReplicaId();
-            LOG.error(str);
-            manageError(action.getOriginalIndex(), action.getAction(),
-                false, new IOException(str), null);
-            continue;
+          if (isReplica) {
+            if (unknownReplicaActions == null) {
+              unknownReplicaActions = new ArrayList<Action<Row>>();
+            }
+            unknownReplicaActions.add(action);
+          } else {
+            // TODO: relies on primary location always being fetched
+            manageLocationError(action, null);
           }
+        } else {
+          byte[] regionName = loc.getRegionInfo().getRegionName();
+          addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
         }
-        byte[] regionName = loc.getRegionInfo().getRegionName();
-        addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
       }
-      // If this is a first attempt to group and send, no replicas, we need replica thread.
+      boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
+      boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
+
       if (!actionsByServer.isEmpty()) {
-        boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
-        sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null);
+        // If this is a first attempt to group and send, no replicas, we need replica thread.
+        sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
+            ? currentActions : null, numAttempt > 1 && !hasUnknown);
+      }
+
+      if (hasUnknown) {
+        actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
+        for (Action<Row> action : unknownReplicaActions) {
+          HRegionLocation loc = getReplicaLocationOrFail(action);
+          if (loc == null) continue;
+          byte[] regionName = loc.getRegionInfo().getRegionName();
+          addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
+        }
+        if (!actionsByServer.isEmpty()) {
+          sendMultiAction(
+              actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
+        }
+      }
+    }
+
+    private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
+      // We are going to try get location once again. For each action, we'll do it once
+      // from cache, because the previous calls in the loop might populate it.
+      int replicaId = action.getReplicaId();
+      RegionLocations locs = findAllLocationsOrFail(action, true);
+      if (locs == null) return null; // manageError already called
+      HRegionLocation loc = locs.getRegionLocation(replicaId);
+      if (loc == null || loc.getServerName() == null) {
+        locs = findAllLocationsOrFail(action, false);
+        if (locs == null) return null; // manageError already called
+        loc = locs.getRegionLocation(replicaId);
+      }
+      if (loc == null || loc.getServerName() == null) {
+        manageLocationError(action, null);
+        return null;
       }
+      return loc;
     }
 
+    private void manageLocationError(Action<Row> action, Exception ex) {
+      String msg = "Cannot get replica " + action.getReplicaId()
+          + " location for " + action.getAction();
+      LOG.error(msg);
+      if (ex == null) {
+        ex = new IOException(msg);
+      }
+      manageError(action.getOriginalIndex(), action.getAction(),
+          Retry.NO_LOCATION_PROBLEM, ex, null);
+    }
+
+    private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache)
{
+      if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
+          ", row cannot be null");
+      RegionLocations loc = null;
+      try {
+        loc = hConnection.locateRegion(
+            tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
+      } catch (IOException ex) {
+        manageLocationError(action, ex);
+      }
+      return loc;
+    }
+
+
+
     /**
      * Send a multi action structure to the servers, after a delay depending on the attempt
      * number. Asynchronous.
@@ -858,7 +906,7 @@ class AsyncProcess {
      * @param actionsForReplicaThread original actions for replica thread; null on non-first
call.
      */
     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
-        int numAttempt, List<Action<Row>> actionsForReplicaThread) {
+        int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread)
{
       // Run the last item on the same thread if we are already on a send thread.
       // We hope most of the time it will be the only item, so we can cut down on threads.
       int actionsRemaining = actionsByServer.size();
@@ -869,8 +917,7 @@ class AsyncProcess {
         incTaskCounters(multiAction.getRegions(), server);
         Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
             new SingleServerRequestRunnable(multiAction, numAttempt, server));
-        --actionsRemaining;
-        if ((numAttempt > 1) && actionsRemaining == 0) {
+        if ((--actionsRemaining == 0) && reuseThread) {
           runnable.run();
         } else {
           try {
@@ -923,21 +970,19 @@ class AsyncProcess {
      * @param server        the location, if any (can be null)
      * @return true if the action can be retried, false otherwise.
      */
-    public boolean manageError(int originalIndex, Row row, boolean canRetry,
+    public Retry manageError(int originalIndex, Row row, Retry canRetry,
                                 Throwable throwable, ServerName server) {
-      if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException)
{
-        canRetry = false;
+      if (canRetry == Retry.YES
+          && throwable != null && throwable instanceof DoNotRetryIOException)
{
+        canRetry = Retry.NO_NOT_RETRIABLE;
       }
 
-      if (!canRetry) {
+      if (canRetry != Retry.YES) {
         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
         setError(originalIndex, row, throwable, server);
-      } else {
-        // See if we are dealing with a replica action that was completed from other server.
-        // Doesn't have to be synchronized, worst case we'd retry and be unable to set result.
-        canRetry = !isActionComplete(originalIndex, row);
+      } else if (isActionComplete(originalIndex, row)) {
+        canRetry = Retry.NO_OTHER_SUCCEEDED;
       }
-
       return canRetry;
     }
 
@@ -952,8 +997,12 @@ class AsyncProcess {
     private void receiveGlobalFailure(
         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t)
{
       errorsByServer.reportServerError(server);
-      boolean canRetry = errorsByServer.canRetryMore(numAttempt);
+      Retry canRetry = errorsByServer.canRetryMore(numAttempt)
+          ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
 
+      int failed = 0, stopped = 0;
+      boolean isReplica = false;
+      boolean firstAction = false;
       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet())
{
         byte[] regionName = e.getKey();
@@ -963,35 +1012,36 @@ class AsyncProcess {
         // TODO: depending on type of exception we might not want to update cache at all?
         hConnection.updateCachedLocations(tableName, regionName, row, null, server);
         for (Action<Row> action : e.getValue()) {
-          if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server))
{
+          if (firstAction) {
+            firstAction = false;
+            isReplica = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+          }
+          Retry retry = manageError(
+              action.getOriginalIndex(), action.getAction(), canRetry, t, server);
+          if (retry == Retry.YES) {
             toReplay.add(action);
+          } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
+            ++stopped;
+          } else {
+            ++failed;
           }
         }
       }
 
-      logAndResubmit(server, toReplay, numAttempt, rsActions.size(), t);
+      if (toReplay.isEmpty()) {
+        logNoResubmit(server, numAttempt, rsActions.size(), t, isReplica, failed, stopped);
+      } else {
+        resubmit(server, toReplay, numAttempt, rsActions.size(), t, isReplica);
+      }
     }
 
     /**
      * Log as much info as possible, and, if there is something to replay,
      * submit it again after a back off sleep.
+     * @param isReplica
      */
-    private void logAndResubmit(ServerName oldServer, List<Action<Row>> toReplay,
-        int numAttempt, int failureCount, Throwable throwable) {
-      if (toReplay.isEmpty()) {
-        // it's either a success or a last failure
-        if (failureCount != 0) {
-          // We have a failure but nothing to retry. We're done, it's a final failure..
-          LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
-              oldServer, throwable, -1, false, errorsByServer.getStartTrackingTime()));
-        } else if (numAttempt > startLogErrorsCnt + 1) {
-          // The operation was successful, but needed several attempts. Let's log this.
-          LOG.info(createLog(numAttempt, failureCount, 0,
-              oldServer, throwable, -1, false, errorsByServer.getStartTrackingTime()));
-        }
-        return;
-      }
-
+    private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
+        int numAttempt, int failureCount, Throwable throwable, boolean isReplica) {
       // We have something to replay. We're going to sleep a little before.
 
       // We have two contradicting needs here:
@@ -1004,7 +1054,7 @@ class AsyncProcess {
         // We use this value to have some logs when we have multiple failures, but not too
many
         //  logs, as errors are to be expected when a region moves, splits and so on
         LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
-            oldServer, throwable, backOffTime, true, errorsByServer.getStartTrackingTime()));
+            oldServer, throwable, backOffTime, true, null, isReplica, -1, -1));
       }
 
       try {
@@ -1018,6 +1068,21 @@ class AsyncProcess {
       groupAndSendMultiAction(toReplay, numAttempt + 1);
     }
 
+    private void logNoResubmit(ServerName oldServer, int numAttempt,
+        int failureCount, Throwable throwable, boolean isReplica, int failed, int stopped)
{
+      if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
+        String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
+        String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
+            throwable, -1, false, timeStr, isReplica, failed, stopped);
+        if (failed != 0) {
+          // Only log final failures as warning
+          LOG.warn(logMessage);
+        } else {
+          LOG.info(logMessage);
+        }
+      }
+    }
+
     /**
      * Called when we receive the result of a server query.
      *
@@ -1042,6 +1107,9 @@ class AsyncProcess {
       boolean canRetry = true;
 
       // Go by original action.
+      int failed = 0, stopped = 0;
+      boolean isReplica = false;
+      boolean firstAction = false;
       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet())
{
         byte[] regionName = regionEntry.getKey();
         Map<Integer, Object> regionResults = responses.getResults().get(regionName);
@@ -1055,6 +1123,10 @@ class AsyncProcess {
         }
         boolean regionFailureRegistered = false;
         for (Action<Row> sentAction : regionEntry.getValue()) {
+          if (firstAction) {
+            firstAction = false;
+            isReplica = !RegionReplicaUtil.isDefaultReplica(sentAction.getReplicaId());
+          }
           Object result = regionResults.get(sentAction.getOriginalIndex());
           // Failure: retry if it's make sense else update the errors lists
           if (result == null || result instanceof Throwable) {
@@ -1071,9 +1143,14 @@ class AsyncProcess {
               canRetry = errorsByServer.canRetryMore(numAttempt);
             }
             ++failureCount;
-            if (manageError(
-                sentAction.getOriginalIndex(), row, canRetry, (Throwable)result, server))
{
+            Retry retry = manageError(sentAction.getOriginalIndex(), row,
+                canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
+            if (retry == Retry.YES) {
               toReplay.add(sentAction);
+            } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
+              ++stopped;
+            } else {
+              ++failed;
             }
           } else {
             if (callback != null) {
@@ -1111,39 +1188,58 @@ class AsyncProcess {
         failureCount += actions.size();
 
         for (Action<Row> action : actions) {
+          if (firstAction) {
+            firstAction = false;
+            isReplica = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+          }
           Row row = action.getAction();
-          if (manageError(action.getOriginalIndex(), row, canRetry, throwable, server)) {
+          Retry retry = manageError(action.getOriginalIndex(), row,
+              canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
+          if (retry == Retry.YES) {
             toReplay.add(action);
+          } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
+            ++stopped;
+          } else {
+            ++failed;
           }
         }
       }
 
-      logAndResubmit(server, toReplay, numAttempt, failureCount, throwable);
+      if (toReplay.isEmpty()) {
+        logNoResubmit(server, numAttempt, failureCount, throwable, isReplica, failed, stopped);
+      } else {
+        resubmit(server, toReplay, numAttempt, failureCount, throwable, isReplica);
+      }
     }
 
     private String createLog(int numAttempt, int failureCount, int replaySize, ServerName
sn,
-                             Throwable error, long backOffTime, boolean willRetry, String
startTime){
+        Throwable error, long backOffTime, boolean willRetry, String startTime,
+        boolean isReplica, int failed, int stopped) {
       StringBuilder sb = new StringBuilder();
-
-      sb.append("#").append(id).append(", table=").append(tableName).
-          append(", attempt=").append(numAttempt).append("/").append(numTries).append(" ");
+      sb.append("#").append(id).append(", table=").append(tableName).append(", ")
+        .append(isReplica ? "replica, " : "primary, ").append("attempt=").append(numAttempt)
+        .append("/").append(numTries).append(" ");
 
       if (failureCount > 0 || error != null){
         sb.append("failed ").append(failureCount).append(" ops").append(", last exception:
").
             append(error == null ? "null" : error);
       } else {
-        sb.append("SUCCEEDED");
+        sb.append("succeeded");
       }
 
-      sb.append(" on ").append(sn);
-
-      sb.append(", tracking started ").append(startTime);
+      sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
 
       if (willRetry) {
         sb.append(", retrying after ").append(backOffTime).append(" ms").
-            append(", replay ").append(replaySize).append(" ops.");
+            append(", replay ").append(replaySize).append(" ops");
       } else if (failureCount > 0) {
-        sb.append(" - FAILED, NOT RETRYING ANYMORE");
+        if (stopped > 0) {
+          sb.append("; not retrying ").append(stopped).append(" due to success from other
replica");
+        }
+        if (failed > 0) {
+          sb.append("; not retrying ").append(failed).append(" - final failure");
+        }
+
       }
 
       return sb.toString();
@@ -1165,7 +1261,7 @@ class AsyncProcess {
          decActionCounter(index);
          return; // Simple case, no replica requests.
       } else if ((state = trySetResultSimple(
-          index, action.getAction(), result, isStale)) == null) {
+          index, action.getAction(), false, result, null, isStale)) == null) {
         return; // Simple case, no replica requests.
       }
       assert state != null;
@@ -1204,8 +1300,8 @@ class AsyncProcess {
         errors.add(throwable, row, server);
         decActionCounter(index);
         return; // Simple case, no replica requests.
-      } else if ((state = trySetResultSimple(index, row, throwable, false)) == null) {
-        errors.add(throwable, row, server);
+      } else if ((state = trySetResultSimple(
+          index, row, true, throwable, server, false)) == null) {
         return; // Simple case, no replica requests.
       }
       assert state != null;
@@ -1264,8 +1360,8 @@ class AsyncProcess {
      * Tries to set the result or error for a particular action as if there were no replica
calls.
      * @return null if successful; replica state if there were in fact replica calls.
      */
-    private ReplicaResultState trySetResultSimple(
-        int index, Row row, Object result, boolean isFromReplica) {
+    private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
+        Object result, ServerName server, boolean isFromReplica) {
       Object resObj = null;
       if (!isReplicaGet(row)) {
         if (isFromReplica) {
@@ -1282,11 +1378,20 @@ class AsyncProcess {
           }
         }
       }
+
+      ReplicaResultState rrs =
+          (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
+      if (rrs == null && isError) {
+        // The resObj is not replica state (null or already set).
+        errors.add((Throwable)result, row, server);
+      }
+
       if (resObj == null) {
+        // resObj is null - no replica calls were made.
         decActionCounter(index);
         return null;
       }
-      return (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
+      return rrs;
     }
 
     private void decActionCounter(int index) {
@@ -1528,4 +1633,15 @@ class AsyncProcess {
   private static boolean isReplicaGet(Row row) {
     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
   }
+
+  /**
+   * For manageError. Only used to make logging more clear, we don't actually care why we
don't retry.
+   */
+  private enum Retry {
+    YES,
+    NO_LOCATION_PROBLEM,
+    NO_NOT_RETRIABLE,
+    NO_RETRIES_EXHAUSTED,
+    NO_OTHER_SUCCEEDED
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index ef9a120..c75687a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -269,10 +269,4 @@ interface ClusterConnection extends HConnection {
    * @return Default AsyncProcess associated with this connection.
    */
   AsyncProcess getAsyncProcess();
-
-  /**
-   * @return All locations for a particular region.
-   */
-  RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException;
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index 30555ff..c57064b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -210,11 +210,6 @@ class ConnectionAdapter implements ClusterConnection {
   }
 
   @Override
-  public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException
{
-    return wrappedConnection.locateRegionAll(tableName, row);
-  }
-
-  @Override
   public void clearRegionCache() {
     wrappedConnection.clearRegionCache();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 2d97ebf..ff30ef4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -1028,15 +1028,9 @@ class ConnectionManager {
     }
 
     @Override
-    public RegionLocations locateRegionAll(
-        final TableName tableName, final byte[] row) throws IOException{
-      return locateRegion(tableName, row, true, true);
-    }
-
-    @Override
     public HRegionLocation locateRegion(
         final TableName tableName, final byte[] row) throws IOException{
-      RegionLocations locations = locateRegionAll(tableName, row);
+      RegionLocations locations = locateRegion(tableName, row, true, true);
       return locations == null ? null : locations.getRegionLocation();
     }
 
@@ -2468,12 +2462,12 @@ class ConnectionManager {
         new ConcurrentHashMap<ServerName, ServerErrors>();
     private final long canRetryUntil;
     private final int maxRetries;
-    private final String startTrackingTime;
+    private final long startTrackingTime;
 
     public ServerErrorTracker(long timeout, int maxRetries) {
       this.maxRetries = maxRetries;
       this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
-      this.startTrackingTime = new Date().toString();
+      this.startTrackingTime = new Date().getTime();
     }
 
     /**
@@ -2520,7 +2514,7 @@ class ConnectionManager {
       }
     }
 
-    String getStartTrackingTime() {
+    long getStartTrackingTime() {
       return startTrackingTime;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index cc69b4f..a0d80c6 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -37,8 +37,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -344,7 +346,8 @@ public class TestAsyncProcess {
     }
 
     @Override
-    public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException
{
+    public RegionLocations locateRegion(TableName tableName,
+        byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
       return new RegionLocations(loc1);
     }
   }
@@ -363,7 +366,8 @@ public class TestAsyncProcess {
     }
 
     @Override
-    public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException
{
+    public RegionLocations locateRegion(TableName tableName,
+        byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
       int i = 0;
       for (HRegionLocation hr : hrl){
         if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
@@ -377,6 +381,9 @@ public class TestAsyncProcess {
 
   }
 
+  @Rule
+  public Timeout timeout = new Timeout(10000); // 10 seconds max per method tested
+
   @Test
   public void testSubmit() throws Exception {
     ClusterConnection hc = createHConnection();
@@ -627,8 +634,8 @@ public class TestAsyncProcess {
 
   private static void setMockLocation(ClusterConnection hc, byte[] row,
       RegionLocations result) throws IOException {
-    Mockito.when(hc.locateRegionAll(
-        Mockito.eq(DUMMY_TABLE), Mockito.eq(row))).thenReturn(result);
+    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
+        Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
   }
 
   private static ClusterConnection createHConnectionCommon() {
@@ -1009,7 +1016,7 @@ public class TestAsyncProcess {
     for (int i = 0; i < expecteds.length; ++i) {
       Object actual = actuals[i];
       RR expected = expecteds[i];
-      Assert.assertEquals(expected == RR.FAILED, actual instanceof Throwable);
+      Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
       if (expected != RR.FAILED && expected != RR.DONT_CARE) {
         Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
index aef9f4e..a512f83 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
@@ -443,9 +443,4 @@ class CoprocessorHConnection implements ClusterConnection {
   public AsyncProcess getAsyncProcess() {
     return delegate.getAsyncProcess();
   }
-
-  @Override
-  public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException
{
-    return delegate.locateRegionAll(tableName, row);
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index f925fea..0f0104a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -111,8 +111,9 @@ public class HConnectionTestingUtility {
       thenReturn(loc);
     Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
       thenReturn(loc);
-    Mockito.when(c.locateRegionAll((TableName) Mockito.any(), (byte[]) Mockito.any())).
-      thenReturn(new RegionLocations(loc));
+    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any(),
+        Mockito.anyBoolean(), Mockito.anyBoolean(),  Mockito.anyInt()))
+        .thenReturn(new RegionLocations(loc));
     if (admin != null) {
       // If a call to getAdmin, return this implementation.
       Mockito.when(c.getAdmin(Mockito.any(ServerName.class))).


Mime
View raw message