Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E086F11780 for ; Sat, 28 Jun 2014 00:30:53 +0000 (UTC) Received: (qmail 66965 invoked by uid 500); 28 Jun 2014 00:30:48 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 66867 invoked by uid 500); 28 Jun 2014 00:30:48 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 66199 invoked by uid 99); 28 Jun 2014 00:30:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Jun 2014 00:30:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 34B2332B3AF; Sat, 28 Jun 2014 00:30:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: enis@apache.org To: commits@hbase.apache.org Date: Sat, 28 Jun 2014 00:31:16 -0000 Message-Id: <39612124ffbf4ecf9b3434976af1becd@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/49] git commit: HBASE-10794 multi-get should handle replica location missing from cache HBASE-10794 multi-get should handle replica location missing from cache git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1586468 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/579f305b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/579f305b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/579f305b Branch: refs/heads/master Commit: 579f305bd0fb5673a82d3eeb74660db7a0d6ddd8 Parents: 61bce90 Author: sershe Authored: Thu Apr 10 20:57:59 2014 +0000 Committer: Enis Soztutar Committed: Fri Jun 27 16:39:39 2014 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 392 ++++++++++++------- .../hadoop/hbase/client/ClusterConnection.java | 6 - .../hadoop/hbase/client/ConnectionAdapter.java | 5 - .../hadoop/hbase/client/ConnectionManager.java | 14 +- .../hadoop/hbase/client/TestAsyncProcess.java | 17 +- .../hbase/client/CoprocessorHConnection.java | 5 - .../hbase/client/HConnectionTestingUtility.java | 5 +- 7 files changed, 273 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 3c6f5be..c184147 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -24,6 +24,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -346,7 +347,15 @@ class AsyncProcess { Row r = it.next(); HRegionLocation loc; try { - loc = findDestLocation(tableName, r, true).getDefaultRegionLocation(); + if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); + // Make sure we get 0-s replica. + RegionLocations locs = hConnection.locateRegion( + tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); + if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { + throw new IOException("#" + id + ", no location found, aborting submit for" + + " tableName=" + tableName + " rowkey=" + Arrays.toString(r.getRow())); + } + loc = locs.getDefaultRegionLocation(); } catch (IOException ex) { locationErrors = new ArrayList(); locationErrorRows = new ArrayList(); @@ -381,10 +390,11 @@ class AsyncProcess { for (int i = 0; i < locationErrors.size(); ++i) { int originalIndex = locationErrorRows.get(i); Row row = retainedActions.get(originalIndex).getAction(); - ars.manageError(originalIndex, row, false, locationErrors.get(i), null); + ars.manageError(originalIndex, row, + Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); } } - ars.sendMultiAction(actionsByServer, 1, null); + ars.sendMultiAction(actionsByServer, 1, null, false); return ars; } @@ -411,24 +421,6 @@ class AsyncProcess { } /** - * Find the destination. - * @param tableName the requisite table. - * @param row the row - * @return the destination. - */ - private RegionLocations findDestLocation( - TableName tableName, Row row, boolean checkPrimary) throws IOException { - if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); - RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow()); - if (loc == null - || (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation() == null))) { - throw new IOException("#" + id + ", no location found, aborting submit for" + - " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow())); - } - return loc; - } - - /** * Check if we should send new operations to this region or region server. * We're taking into account the past decision; if we have already accepted * operation on a given region, we accept all operations for this region. @@ -585,17 +577,29 @@ class AsyncProcess { if (done) return; // Done within primary timeout Map> actionsByServer = new HashMap>(); + List> unknownLocActions = new ArrayList>(); if (replicaGetIndices == null) { for (int i = 0; i < results.length; ++i) { - addReplicaActions(i, actionsByServer); + addReplicaActions(i, actionsByServer, unknownLocActions); } } else { for (int i = 0; i < replicaGetIndices.length; ++i) { - addReplicaActions(replicaGetIndices[i], actionsByServer); + addReplicaActions(replicaGetIndices[i], actionsByServer, unknownLocActions); + } + } + if (!actionsByServer.isEmpty()) { + sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); + } + if (!unknownLocActions.isEmpty()) { + actionsByServer = new HashMap>(); + for (Action action : unknownLocActions) { + addReplicaActionsAgain(action, actionsByServer); + } + // Some actions may have completely failed, they are handled inside addAgain. + if (!actionsByServer.isEmpty()) { + sendMultiAction(actionsByServer, 1, null, true); } } - if (actionsByServer.isEmpty()) return; // Nothing to do - done or no replicas found. - sendMultiAction(actionsByServer, 1, null); } /** @@ -603,33 +607,14 @@ class AsyncProcess { * @param index Index of the original action. * @param actionsByServer The map by server to add it to. */ - private void addReplicaActions( - int index, Map> actionsByServer) { + private void addReplicaActions(int index, Map> actionsByServer, + List> unknownReplicaActions) { if (results[index] != null) return; // opportunistic. Never goes from non-null to null. Action action = initialActions.get(index); - RegionLocations loc = null; - try { - // For perf, we assume that this location coming from cache, since we just got location - // from meta for the primary call. If it turns out to not be the case, we'd need local - // cache since we want to keep as little time as possible before replica call. - loc = findDestLocation(tableName, action.getAction(), false); - } catch (IOException ex) { - manageError(action.getOriginalIndex(), action.getAction(), false, ex, null); - LOG.error("Cannot get location - no replica calls for some actions", ex); - return; - } + RegionLocations loc = findAllLocationsOrFail(action, true); + if (loc == null) return; HRegionLocation[] locs = loc.getRegionLocations(); - int replicaCount = 0; - for (int i = 1; i < locs.length; ++i) { - replicaCount += (locs[i] != null) ? 1 : 0; - } - if (replicaCount == 0) { - // we could have got the replica back (if the server went down and the replica moved) - try { - loc = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true); - } catch (IOException e) { - manageError(action.getOriginalIndex(), action.getAction(), false, e, null); - } + if (locs.length == 1) { LOG.warn("No replicas found for " + action.getAction()); return; } @@ -639,13 +624,29 @@ class AsyncProcess { // but that would require additional synchronization w.r.t. returning to caller. if (results[index] != null) return; // We set the number of calls here. After that any path must call setResult/setError. - results[index] = new ReplicaResultState(replicaCount + 1); + // True even for replicas that are not found - if we refuse to send we MUST set error. + results[index] = new ReplicaResultState(locs.length); } for (int i = 1; i < locs.length; ++i) { - if (locs[i] == null) continue; - addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), - new Action(action, i), actionsByServer, nonceGroup); + Action replicaAction = new Action(action, i); + if (locs[i] != null) { + addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), + replicaAction, actionsByServer, nonceGroup); + } else { + unknownReplicaActions.add(replicaAction); + } + } + } + + private void addReplicaActionsAgain( + Action action, Map> actionsByServer) { + if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { + throw new AssertionError("Cannot have default replica here"); } + HRegionLocation loc = getReplicaLocationOrFail(action); + if (loc == null) return; + addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), + action, actionsByServer, nonceGroup); } } @@ -797,22 +798,14 @@ class AsyncProcess { * @param numAttempt - the current numAttempt (first attempt is 1) */ private void groupAndSendMultiAction(List> currentActions, int numAttempt) { - // group per location => regions server - final Map> actionsByServer = + Map> actionsByServer = new HashMap>(); boolean isReplica = false; + List> unknownReplicaActions = null; for (Action action : currentActions) { - RegionLocations locs = null; - try { - locs = findDestLocation(tableName, action.getAction(), false); - } catch (IOException ex) { - // There are multiple retries in locateRegion already. No need to add new. - // We can't continue with this row, hence it's the last retry. - manageError(action.getOriginalIndex(), action.getAction(), false, ex, null); - continue; - } - + RegionLocations locs = findAllLocationsOrFail(action, true); + if (locs == null) continue; boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); if (isReplica && !isReplicaAction) { // This is the property of the current implementation, not a requirement. @@ -821,34 +814,89 @@ class AsyncProcess { isReplica = isReplicaAction; HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); if (loc == null || loc.getServerName() == null) { - try { - locs = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true, action.getReplicaId()); - loc = locs.getRegionLocation(action.getReplicaId()); - } catch (IOException e) { - // There are multiple retries in locateRegion already. No need to add new. - // We can't continue with this row, hence it's the last retry. - manageError(action.getOriginalIndex(), action.getAction(), false, e, null); - continue; - } - if (loc == null || loc.getServerName() == null) { - // On retry, we couldn't find location for some replica we saw before. - String str = "Cannot find location for replica " + action.getReplicaId(); - LOG.error(str); - manageError(action.getOriginalIndex(), action.getAction(), - false, new IOException(str), null); - continue; + if (isReplica) { + if (unknownReplicaActions == null) { + unknownReplicaActions = new ArrayList>(); + } + unknownReplicaActions.add(action); + } else { + // TODO: relies on primary location always being fetched + manageLocationError(action, null); } + } else { + byte[] regionName = loc.getRegionInfo().getRegionName(); + addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); } - byte[] regionName = loc.getRegionInfo().getRegionName(); - addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); } - // If this is a first attempt to group and send, no replicas, we need replica thread. + boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); + boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty(); + if (!actionsByServer.isEmpty()) { - boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); - sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null); + // If this is a first attempt to group and send, no replicas, we need replica thread. + sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown) + ? currentActions : null, numAttempt > 1 && !hasUnknown); + } + + if (hasUnknown) { + actionsByServer = new HashMap>(); + for (Action action : unknownReplicaActions) { + HRegionLocation loc = getReplicaLocationOrFail(action); + if (loc == null) continue; + byte[] regionName = loc.getRegionInfo().getRegionName(); + addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); + } + if (!actionsByServer.isEmpty()) { + sendMultiAction( + actionsByServer, numAttempt, doStartReplica ? currentActions : null, true); + } + } + } + + private HRegionLocation getReplicaLocationOrFail(Action action) { + // We are going to try get location once again. For each action, we'll do it once + // from cache, because the previous calls in the loop might populate it. + int replicaId = action.getReplicaId(); + RegionLocations locs = findAllLocationsOrFail(action, true); + if (locs == null) return null; // manageError already called + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null || loc.getServerName() == null) { + locs = findAllLocationsOrFail(action, false); + if (locs == null) return null; // manageError already called + loc = locs.getRegionLocation(replicaId); + } + if (loc == null || loc.getServerName() == null) { + manageLocationError(action, null); + return null; } + return loc; } + private void manageLocationError(Action action, Exception ex) { + String msg = "Cannot get replica " + action.getReplicaId() + + " location for " + action.getAction(); + LOG.error(msg); + if (ex == null) { + ex = new IOException(msg); + } + manageError(action.getOriginalIndex(), action.getAction(), + Retry.NO_LOCATION_PROBLEM, ex, null); + } + + private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { + if (action.getAction() == null) throw new IllegalArgumentException("#" + id + + ", row cannot be null"); + RegionLocations loc = null; + try { + loc = hConnection.locateRegion( + tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); + } catch (IOException ex) { + manageLocationError(action, ex); + } + return loc; + } + + + /** * Send a multi action structure to the servers, after a delay depending on the attempt * number. Asynchronous. @@ -858,7 +906,7 @@ class AsyncProcess { * @param actionsForReplicaThread original actions for replica thread; null on non-first call. */ private void sendMultiAction(Map> actionsByServer, - int numAttempt, List> actionsForReplicaThread) { + int numAttempt, List> actionsForReplicaThread, boolean reuseThread) { // Run the last item on the same thread if we are already on a send thread. // We hope most of the time it will be the only item, so we can cut down on threads. int actionsRemaining = actionsByServer.size(); @@ -869,8 +917,7 @@ class AsyncProcess { incTaskCounters(multiAction.getRegions(), server); Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new SingleServerRequestRunnable(multiAction, numAttempt, server)); - --actionsRemaining; - if ((numAttempt > 1) && actionsRemaining == 0) { + if ((--actionsRemaining == 0) && reuseThread) { runnable.run(); } else { try { @@ -923,21 +970,19 @@ class AsyncProcess { * @param server the location, if any (can be null) * @return true if the action can be retried, false otherwise. */ - public boolean manageError(int originalIndex, Row row, boolean canRetry, + public Retry manageError(int originalIndex, Row row, Retry canRetry, Throwable throwable, ServerName server) { - if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) { - canRetry = false; + if (canRetry == Retry.YES + && throwable != null && throwable instanceof DoNotRetryIOException) { + canRetry = Retry.NO_NOT_RETRIABLE; } - if (!canRetry) { + if (canRetry != Retry.YES) { // Batch.Callback was not called on failure in 0.94. We keep this. setError(originalIndex, row, throwable, server); - } else { - // See if we are dealing with a replica action that was completed from other server. - // Doesn't have to be synchronized, worst case we'd retry and be unable to set result. - canRetry = !isActionComplete(originalIndex, row); + } else if (isActionComplete(originalIndex, row)) { + canRetry = Retry.NO_OTHER_SUCCEEDED; } - return canRetry; } @@ -952,8 +997,12 @@ class AsyncProcess { private void receiveGlobalFailure( MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { errorsByServer.reportServerError(server); - boolean canRetry = errorsByServer.canRetryMore(numAttempt); + Retry canRetry = errorsByServer.canRetryMore(numAttempt) + ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; + int failed = 0, stopped = 0; + boolean isReplica = false; + boolean firstAction = false; List> toReplay = new ArrayList>(); for (Map.Entry>> e : rsActions.actions.entrySet()) { byte[] regionName = e.getKey(); @@ -963,35 +1012,36 @@ class AsyncProcess { // TODO: depending on type of exception we might not want to update cache at all? hConnection.updateCachedLocations(tableName, regionName, row, null, server); for (Action action : e.getValue()) { - if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) { + if (firstAction) { + firstAction = false; + isReplica = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); + } + Retry retry = manageError( + action.getOriginalIndex(), action.getAction(), canRetry, t, server); + if (retry == Retry.YES) { toReplay.add(action); + } else if (retry == Retry.NO_OTHER_SUCCEEDED) { + ++stopped; + } else { + ++failed; } } } - logAndResubmit(server, toReplay, numAttempt, rsActions.size(), t); + if (toReplay.isEmpty()) { + logNoResubmit(server, numAttempt, rsActions.size(), t, isReplica, failed, stopped); + } else { + resubmit(server, toReplay, numAttempt, rsActions.size(), t, isReplica); + } } /** * Log as much info as possible, and, if there is something to replay, * submit it again after a back off sleep. + * @param isReplica */ - private void logAndResubmit(ServerName oldServer, List> toReplay, - int numAttempt, int failureCount, Throwable throwable) { - if (toReplay.isEmpty()) { - // it's either a success or a last failure - if (failureCount != 0) { - // We have a failure but nothing to retry. We're done, it's a final failure.. - LOG.warn(createLog(numAttempt, failureCount, toReplay.size(), - oldServer, throwable, -1, false, errorsByServer.getStartTrackingTime())); - } else if (numAttempt > startLogErrorsCnt + 1) { - // The operation was successful, but needed several attempts. Let's log this. - LOG.info(createLog(numAttempt, failureCount, 0, - oldServer, throwable, -1, false, errorsByServer.getStartTrackingTime())); - } - return; - } - + private void resubmit(ServerName oldServer, List> toReplay, + int numAttempt, int failureCount, Throwable throwable, boolean isReplica) { // We have something to replay. We're going to sleep a little before. // We have two contradicting needs here: @@ -1004,7 +1054,7 @@ class AsyncProcess { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on LOG.info(createLog(numAttempt, failureCount, toReplay.size(), - oldServer, throwable, backOffTime, true, errorsByServer.getStartTrackingTime())); + oldServer, throwable, backOffTime, true, null, isReplica, -1, -1)); } try { @@ -1018,6 +1068,21 @@ class AsyncProcess { groupAndSendMultiAction(toReplay, numAttempt + 1); } + private void logNoResubmit(ServerName oldServer, int numAttempt, + int failureCount, Throwable throwable, boolean isReplica, int failed, int stopped) { + if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) { + String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); + String logMessage = createLog(numAttempt, failureCount, 0, oldServer, + throwable, -1, false, timeStr, isReplica, failed, stopped); + if (failed != 0) { + // Only log final failures as warning + LOG.warn(logMessage); + } else { + LOG.info(logMessage); + } + } + } + /** * Called when we receive the result of a server query. * @@ -1042,6 +1107,9 @@ class AsyncProcess { boolean canRetry = true; // Go by original action. + int failed = 0, stopped = 0; + boolean isReplica = false; + boolean firstAction = false; for (Map.Entry>> regionEntry : multiAction.actions.entrySet()) { byte[] regionName = regionEntry.getKey(); Map regionResults = responses.getResults().get(regionName); @@ -1055,6 +1123,10 @@ class AsyncProcess { } boolean regionFailureRegistered = false; for (Action sentAction : regionEntry.getValue()) { + if (firstAction) { + firstAction = false; + isReplica = !RegionReplicaUtil.isDefaultReplica(sentAction.getReplicaId()); + } Object result = regionResults.get(sentAction.getOriginalIndex()); // Failure: retry if it's make sense else update the errors lists if (result == null || result instanceof Throwable) { @@ -1071,9 +1143,14 @@ class AsyncProcess { canRetry = errorsByServer.canRetryMore(numAttempt); } ++failureCount; - if (manageError( - sentAction.getOriginalIndex(), row, canRetry, (Throwable)result, server)) { + Retry retry = manageError(sentAction.getOriginalIndex(), row, + canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server); + if (retry == Retry.YES) { toReplay.add(sentAction); + } else if (retry == Retry.NO_OTHER_SUCCEEDED) { + ++stopped; + } else { + ++failed; } } else { if (callback != null) { @@ -1111,39 +1188,58 @@ class AsyncProcess { failureCount += actions.size(); for (Action action : actions) { + if (firstAction) { + firstAction = false; + isReplica = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); + } Row row = action.getAction(); - if (manageError(action.getOriginalIndex(), row, canRetry, throwable, server)) { + Retry retry = manageError(action.getOriginalIndex(), row, + canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); + if (retry == Retry.YES) { toReplay.add(action); + } else if (retry == Retry.NO_OTHER_SUCCEEDED) { + ++stopped; + } else { + ++failed; } } } - logAndResubmit(server, toReplay, numAttempt, failureCount, throwable); + if (toReplay.isEmpty()) { + logNoResubmit(server, numAttempt, failureCount, throwable, isReplica, failed, stopped); + } else { + resubmit(server, toReplay, numAttempt, failureCount, throwable, isReplica); + } } private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, - Throwable error, long backOffTime, boolean willRetry, String startTime){ + Throwable error, long backOffTime, boolean willRetry, String startTime, + boolean isReplica, int failed, int stopped) { StringBuilder sb = new StringBuilder(); - - sb.append("#").append(id).append(", table=").append(tableName). - append(", attempt=").append(numAttempt).append("/").append(numTries).append(" "); + sb.append("#").append(id).append(", table=").append(tableName).append(", ") + .append(isReplica ? "replica, " : "primary, ").append("attempt=").append(numAttempt) + .append("/").append(numTries).append(" "); if (failureCount > 0 || error != null){ sb.append("failed ").append(failureCount).append(" ops").append(", last exception: "). append(error == null ? "null" : error); } else { - sb.append("SUCCEEDED"); + sb.append("succeeded"); } - sb.append(" on ").append(sn); - - sb.append(", tracking started ").append(startTime); + sb.append(" on ").append(sn).append(", tracking started ").append(startTime); if (willRetry) { sb.append(", retrying after ").append(backOffTime).append(" ms"). - append(", replay ").append(replaySize).append(" ops."); + append(", replay ").append(replaySize).append(" ops"); } else if (failureCount > 0) { - sb.append(" - FAILED, NOT RETRYING ANYMORE"); + if (stopped > 0) { + sb.append("; not retrying ").append(stopped).append(" due to success from other replica"); + } + if (failed > 0) { + sb.append("; not retrying ").append(failed).append(" - final failure"); + } + } return sb.toString(); @@ -1165,7 +1261,7 @@ class AsyncProcess { decActionCounter(index); return; // Simple case, no replica requests. } else if ((state = trySetResultSimple( - index, action.getAction(), result, isStale)) == null) { + index, action.getAction(), false, result, null, isStale)) == null) { return; // Simple case, no replica requests. } assert state != null; @@ -1204,8 +1300,8 @@ class AsyncProcess { errors.add(throwable, row, server); decActionCounter(index); return; // Simple case, no replica requests. - } else if ((state = trySetResultSimple(index, row, throwable, false)) == null) { - errors.add(throwable, row, server); + } else if ((state = trySetResultSimple( + index, row, true, throwable, server, false)) == null) { return; // Simple case, no replica requests. } assert state != null; @@ -1264,8 +1360,8 @@ class AsyncProcess { * Tries to set the result or error for a particular action as if there were no replica calls. * @return null if successful; replica state if there were in fact replica calls. */ - private ReplicaResultState trySetResultSimple( - int index, Row row, Object result, boolean isFromReplica) { + private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, + Object result, ServerName server, boolean isFromReplica) { Object resObj = null; if (!isReplicaGet(row)) { if (isFromReplica) { @@ -1282,11 +1378,20 @@ class AsyncProcess { } } } + + ReplicaResultState rrs = + (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; + if (rrs == null && isError) { + // The resObj is not replica state (null or already set). + errors.add((Throwable)result, row, server); + } + if (resObj == null) { + // resObj is null - no replica calls were made. decActionCounter(index); return null; } - return (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; + return rrs; } private void decActionCounter(int index) { @@ -1528,4 +1633,15 @@ class AsyncProcess { private static boolean isReplicaGet(Row row) { return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE); } + + /** + * For manageError. Only used to make logging more clear, we don't actually care why we don't retry. + */ + private enum Retry { + YES, + NO_LOCATION_PROBLEM, + NO_NOT_RETRIABLE, + NO_RETRIES_EXHAUSTED, + NO_OTHER_SUCCEEDED + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index ef9a120..c75687a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -269,10 +269,4 @@ interface ClusterConnection extends HConnection { * @return Default AsyncProcess associated with this connection. */ AsyncProcess getAsyncProcess(); - - /** - * @return All locations for a particular region. - */ - RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException; - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 30555ff..c57064b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -210,11 +210,6 @@ class ConnectionAdapter implements ClusterConnection { } @Override - public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { - return wrappedConnection.locateRegionAll(tableName, row); - } - - @Override public void clearRegionCache() { wrappedConnection.clearRegionCache(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 2d97ebf..ff30ef4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -1028,15 +1028,9 @@ class ConnectionManager { } @Override - public RegionLocations locateRegionAll( - final TableName tableName, final byte[] row) throws IOException{ - return locateRegion(tableName, row, true, true); - } - - @Override public HRegionLocation locateRegion( final TableName tableName, final byte[] row) throws IOException{ - RegionLocations locations = locateRegionAll(tableName, row); + RegionLocations locations = locateRegion(tableName, row, true, true); return locations == null ? null : locations.getRegionLocation(); } @@ -2468,12 +2462,12 @@ class ConnectionManager { new ConcurrentHashMap(); private final long canRetryUntil; private final int maxRetries; - private final String startTrackingTime; + private final long startTrackingTime; public ServerErrorTracker(long timeout, int maxRetries) { this.maxRetries = maxRetries; this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout; - this.startTrackingTime = new Date().toString(); + this.startTrackingTime = new Date().getTime(); } /** @@ -2520,7 +2514,7 @@ class ConnectionManager { } } - String getStartTrackingTime() { + long getStartTrackingTime() { return startTrackingTime; } http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index cc69b4f..a0d80c6 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -37,8 +37,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.Timeout; import org.mockito.Mockito; import java.io.IOException; @@ -344,7 +346,8 @@ public class TestAsyncProcess { } @Override - public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { + public RegionLocations locateRegion(TableName tableName, + byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { return new RegionLocations(loc1); } } @@ -363,7 +366,8 @@ public class TestAsyncProcess { } @Override - public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { + public RegionLocations locateRegion(TableName tableName, + byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { int i = 0; for (HRegionLocation hr : hrl){ if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { @@ -377,6 +381,9 @@ public class TestAsyncProcess { } + @Rule + public Timeout timeout = new Timeout(10000); // 10 seconds max per method tested + @Test public void testSubmit() throws Exception { ClusterConnection hc = createHConnection(); @@ -627,8 +634,8 @@ public class TestAsyncProcess { private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) throws IOException { - Mockito.when(hc.locateRegionAll( - Mockito.eq(DUMMY_TABLE), Mockito.eq(row))).thenReturn(result); + Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), + Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); } private static ClusterConnection createHConnectionCommon() { @@ -1009,7 +1016,7 @@ public class TestAsyncProcess { for (int i = 0; i < expecteds.length; ++i) { Object actual = actuals[i]; RR expected = expecteds[i]; - Assert.assertEquals(expected == RR.FAILED, actual instanceof Throwable); + Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable); if (expected != RR.FAILED && expected != RR.DONT_CARE) { Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index aef9f4e..a512f83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -443,9 +443,4 @@ class CoprocessorHConnection implements ClusterConnection { public AsyncProcess getAsyncProcess() { return delegate.getAsyncProcess(); } - - @Override - public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { - return delegate.locateRegionAll(tableName, row); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/579f305b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index f925fea..0f0104a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -111,8 +111,9 @@ public class HConnectionTestingUtility { thenReturn(loc); Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())). thenReturn(loc); - Mockito.when(c.locateRegionAll((TableName) Mockito.any(), (byte[]) Mockito.any())). - thenReturn(new RegionLocations(loc)); + Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any(), + Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())) + .thenReturn(new RegionLocations(loc)); if (admin != null) { // If a call to getAdmin, return this implementation. Mockito.when(c.getAdmin(Mockito.any(ServerName.class))).