Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3E7C21963E for ; Thu, 24 Mar 2016 15:54:56 +0000 (UTC) Received: (qmail 72005 invoked by uid 500); 24 Mar 2016 15:54:45 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 71927 invoked by uid 500); 24 Mar 2016 15:54:45 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 68294 invoked by uid 99); 24 Mar 2016 15:54:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 15:54:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 54B62DFCE0; Thu, 24 Mar 2016 15:54:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Thu, 24 Mar 2016 15:55:21 -0000 Message-Id: <34d51b04b66047119f8fbeb196036645@git.apache.org> In-Reply-To: <4b92113c539c459a9ed56e402685cba1@git.apache.org> References: <4b92113c539c459a9ed56e402685cba1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [41/51] [partial] hbase-site git commit: Published site at 52fd70500e0a00e273e2ec0c09d7c914b89432ce. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f30982bd/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFuture.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFuture.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFuture.html index ed7bc7f..e65df71 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFuture.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFuture.html @@ -1198,681 +1198,699 @@ 1190 byte[] row = e.getValue().iterator().next().getAction().getRow(); 1191 // Do not use the exception for updating cache because it might be coming from 1192 // any of the regions in the MultiAction. -1193 if (tableName != null) { -1194 connection.updateCachedLocations(tableName, regionName, row, -1195 ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server); -1196 } -1197 for (Action<Row> action : e.getValue()) { -1198 Retry retry = manageError( -1199 action.getOriginalIndex(), action.getAction(), canRetry, t, server); -1200 if (retry == Retry.YES) { -1201 toReplay.add(action); -1202 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { -1203 ++stopped; -1204 } else { -1205 ++failed; -1206 } -1207 } -1208 } -1209 -1210 if (toReplay.isEmpty()) { -1211 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); -1212 } else { -1213 resubmit(server, toReplay, numAttempt, rsActions.size(), t); +1193 try { +1194 if (tableName != null) { +1195 connection.updateCachedLocations(tableName, regionName, row, +1196 ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server); +1197 } +1198 } catch (Throwable ex) { +1199 // That should never happen, but if it did, we want to make sure +1200 // we still process errors +1201 LOG.error("Couldn't update cached region locations: " + ex); +1202 } +1203 for (Action<Row> action : e.getValue()) { +1204 Retry retry = manageError( +1205 action.getOriginalIndex(), action.getAction(), canRetry, t, server); +1206 if (retry == Retry.YES) { +1207 toReplay.add(action); +1208 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { +1209 ++stopped; +1210 } else { +1211 ++failed; +1212 } +1213 } 1214 } -1215 } -1216 -1217 /** -1218 * Log as much info as possible, and, if there is something to replay, -1219 * submit it again after a back off sleep. -1220 */ -1221 private void resubmit(ServerName oldServer, List<Action<Row>> toReplay, -1222 int numAttempt, int failureCount, Throwable throwable) { -1223 // We have something to replay. We're going to sleep a little before. -1224 -1225 // We have two contradicting needs here: -1226 // 1) We want to get the new location after having slept, as it may change. -1227 // 2) We want to take into account the location when calculating the sleep time. -1228 // 3) If all this is just because the response needed to be chunked try again FAST. -1229 // It should be possible to have some heuristics to take the right decision. Short term, -1230 // we go for one. -1231 boolean retryImmediately = throwable instanceof RetryImmediatelyException; -1232 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; -1233 long backOffTime = retryImmediately ? 0 : -1234 errorsByServer.calculateBackoffTime(oldServer, pause); -1235 if (numAttempt > startLogErrorsCnt) { -1236 // We use this value to have some logs when we have multiple failures, but not too many -1237 // logs, as errors are to be expected when a region moves, splits and so on -1238 LOG.info(createLog(numAttempt, failureCount, toReplay.size(), -1239 oldServer, throwable, backOffTime, true, null, -1, -1)); -1240 } -1241 -1242 try { -1243 if (backOffTime > 0) { -1244 Thread.sleep(backOffTime); -1245 } -1246 } catch (InterruptedException e) { -1247 LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); -1248 Thread.currentThread().interrupt(); -1249 return; -1250 } -1251 -1252 groupAndSendMultiAction(toReplay, nextAttemptNumber); -1253 } -1254 -1255 private void logNoResubmit(ServerName oldServer, int numAttempt, -1256 int failureCount, Throwable throwable, int failed, int stopped) { -1257 if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) { -1258 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); -1259 String logMessage = createLog(numAttempt, failureCount, 0, oldServer, -1260 throwable, -1, false, timeStr, failed, stopped); -1261 if (failed != 0) { -1262 // Only log final failures as warning -1263 LOG.warn(logMessage); -1264 } else { -1265 LOG.info(logMessage); -1266 } -1267 } -1268 } -1269 -1270 /** -1271 * Called when we receive the result of a server query. -1272 * -1273 * @param multiAction - the multiAction we sent -1274 * @param server - the location. It's used as a server name. -1275 * @param responses - the response, if any -1276 * @param numAttempt - the attempt -1277 */ -1278 private void receiveMultiAction(MultiAction<Row> multiAction, -1279 ServerName server, MultiResponse responses, int numAttempt) { -1280 assert responses != null; -1281 -1282 // Success or partial success -1283 // Analyze detailed results. We can still have individual failures to be redo. -1284 // two specific throwables are managed: -1285 // - DoNotRetryIOException: we continue to retry for other actions -1286 // - RegionMovedException: we update the cache with the new region location +1215 +1216 if (toReplay.isEmpty()) { +1217 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); +1218 } else { +1219 resubmit(server, toReplay, numAttempt, rsActions.size(), t); +1220 } +1221 } +1222 +1223 /** +1224 * Log as much info as possible, and, if there is something to replay, +1225 * submit it again after a back off sleep. +1226 */ +1227 private void resubmit(ServerName oldServer, List<Action<Row>> toReplay, +1228 int numAttempt, int failureCount, Throwable throwable) { +1229 // We have something to replay. We're going to sleep a little before. +1230 +1231 // We have two contradicting needs here: +1232 // 1) We want to get the new location after having slept, as it may change. +1233 // 2) We want to take into account the location when calculating the sleep time. +1234 // 3) If all this is just because the response needed to be chunked try again FAST. +1235 // It should be possible to have some heuristics to take the right decision. Short term, +1236 // we go for one. +1237 boolean retryImmediately = throwable instanceof RetryImmediatelyException; +1238 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; +1239 long backOffTime = retryImmediately ? 0 : +1240 errorsByServer.calculateBackoffTime(oldServer, pause); +1241 if (numAttempt > startLogErrorsCnt) { +1242 // We use this value to have some logs when we have multiple failures, but not too many +1243 // logs, as errors are to be expected when a region moves, splits and so on +1244 LOG.info(createLog(numAttempt, failureCount, toReplay.size(), +1245 oldServer, throwable, backOffTime, true, null, -1, -1)); +1246 } +1247 +1248 try { +1249 if (backOffTime > 0) { +1250 Thread.sleep(backOffTime); +1251 } +1252 } catch (InterruptedException e) { +1253 LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); +1254 Thread.currentThread().interrupt(); +1255 return; +1256 } +1257 +1258 groupAndSendMultiAction(toReplay, nextAttemptNumber); +1259 } +1260 +1261 private void logNoResubmit(ServerName oldServer, int numAttempt, +1262 int failureCount, Throwable throwable, int failed, int stopped) { +1263 if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) { +1264 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); +1265 String logMessage = createLog(numAttempt, failureCount, 0, oldServer, +1266 throwable, -1, false, timeStr, failed, stopped); +1267 if (failed != 0) { +1268 // Only log final failures as warning +1269 LOG.warn(logMessage); +1270 } else { +1271 LOG.info(logMessage); +1272 } +1273 } +1274 } +1275 +1276 /** +1277 * Called when we receive the result of a server query. +1278 * +1279 * @param multiAction - the multiAction we sent +1280 * @param server - the location. It's used as a server name. +1281 * @param responses - the response, if any +1282 * @param numAttempt - the attempt +1283 */ +1284 private void receiveMultiAction(MultiAction<Row> multiAction, +1285 ServerName server, MultiResponse responses, int numAttempt) { +1286 assert responses != null; 1287 -1288 List<Action<Row>> toReplay = new ArrayList<Action<Row>>(); -1289 Throwable throwable = null; -1290 int failureCount = 0; -1291 boolean canRetry = true; -1292 -1293 Map<byte[], MultiResponse.RegionResult> results = responses.getResults(); -1294 updateStats(server, results); -1295 -1296 int failed = 0, stopped = 0; -1297 // Go by original action. -1298 for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) { -1299 byte[] regionName = regionEntry.getKey(); -1300 Map<Integer, Object> regionResults = results.get(regionName) == null -1301 ? null : results.get(regionName).result; -1302 if (regionResults == null) { -1303 if (!responses.getExceptions().containsKey(regionName)) { -1304 LOG.error("Server sent us neither results nor exceptions for " -1305 + Bytes.toStringBinary(regionName)); -1306 responses.getExceptions().put(regionName, new RuntimeException("Invalid response")); -1307 } -1308 continue; -1309 } -1310 boolean regionFailureRegistered = false; -1311 for (Action<Row> sentAction : regionEntry.getValue()) { -1312 Object result = regionResults.get(sentAction.getOriginalIndex()); -1313 // Failure: retry if it's make sense else update the errors lists -1314 if (result == null || result instanceof Throwable) { -1315 Row row = sentAction.getAction(); -1316 throwable = ClientExceptionsUtil.findException(result); -1317 // Register corresponding failures once per server/once per region. -1318 if (!regionFailureRegistered) { -1319 regionFailureRegistered = true; -1320 connection.updateCachedLocations( -1321 tableName, regionName, row.getRow(), result, server); -1322 } -1323 if (failureCount == 0) { -1324 errorsByServer.reportServerError(server); -1325 // We determine canRetry only once for all calls, after reporting server failure. -1326 canRetry = errorsByServer.canTryMore(numAttempt); -1327 } -1328 ++failureCount; -1329 Retry retry = manageError(sentAction.getOriginalIndex(), row, -1330 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server); -1331 if (retry == Retry.YES) { -1332 toReplay.add(sentAction); -1333 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { -1334 ++stopped; -1335 } else { -1336 ++failed; -1337 } -1338 } else { -1339 if (callback != null) { -1340 try { -1341 //noinspection unchecked -1342 // TODO: would callback expect a replica region name if it gets one? -1343 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result); -1344 } catch (Throwable t) { -1345 LOG.error("User callback threw an exception for " -1346 + Bytes.toStringBinary(regionName) + ", ignoring", t); -1347 } -1348 } -1349 setResult(sentAction, result); -1350 } -1351 } -1352 } -1353 -1354 // The failures global to a region. We will use for multiAction we sent previously to find the -1355 // actions to replay. -1356 for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) { -1357 throwable = throwableEntry.getValue(); -1358 byte[] region = throwableEntry.getKey(); -1359 List<Action<Row>> actions = multiAction.actions.get(region); -1360 if (actions == null || actions.isEmpty()) { -1361 throw new IllegalStateException("Wrong response for the region: " + -1362 HRegionInfo.encodeRegionName(region)); +1288 // Success or partial success +1289 // Analyze detailed results. We can still have individual failures to be redo. +1290 // two specific throwables are managed: +1291 // - DoNotRetryIOException: we continue to retry for other actions +1292 // - RegionMovedException: we update the cache with the new region location +1293 +1294 List<Action<Row>> toReplay = new ArrayList<Action<Row>>(); +1295 Throwable throwable = null; +1296 int failureCount = 0; +1297 boolean canRetry = true; +1298 +1299 Map<byte[], MultiResponse.RegionResult> results = responses.getResults(); +1300 updateStats(server, results); +1301 +1302 int failed = 0, stopped = 0; +1303 // Go by original action. +1304 for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) { +1305 byte[] regionName = regionEntry.getKey(); +1306 Map<Integer, Object> regionResults = results.get(regionName) == null +1307 ? null : results.get(regionName).result; +1308 if (regionResults == null) { +1309 if (!responses.getExceptions().containsKey(regionName)) { +1310 LOG.error("Server sent us neither results nor exceptions for " +1311 + Bytes.toStringBinary(regionName)); +1312 responses.getExceptions().put(regionName, new RuntimeException("Invalid response")); +1313 } +1314 continue; +1315 } +1316 boolean regionFailureRegistered = false; +1317 for (Action<Row> sentAction : regionEntry.getValue()) { +1318 Object result = regionResults.get(sentAction.getOriginalIndex()); +1319 // Failure: retry if it's make sense else update the errors lists +1320 if (result == null || result instanceof Throwable) { +1321 Row row = sentAction.getAction(); +1322 throwable = ClientExceptionsUtil.findException(result); +1323 // Register corresponding failures once per server/once per region. +1324 if (!regionFailureRegistered) { +1325 regionFailureRegistered = true; +1326 try { +1327 connection.updateCachedLocations( +1328 tableName, regionName, row.getRow(), result, server); +1329 } catch (Throwable ex) { +1330 // That should never happen, but if it did, we want to make sure +1331 // we still process errors +1332 LOG.error("Couldn't update cached region locations: " + ex); +1333 } +1334 } +1335 if (failureCount == 0) { +1336 errorsByServer.reportServerError(server); +1337 // We determine canRetry only once for all calls, after reporting server failure. +1338 canRetry = errorsByServer.canTryMore(numAttempt); +1339 } +1340 ++failureCount; +1341 Retry retry = manageError(sentAction.getOriginalIndex(), row, +1342 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server); +1343 if (retry == Retry.YES) { +1344 toReplay.add(sentAction); +1345 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { +1346 ++stopped; +1347 } else { +1348 ++failed; +1349 } +1350 } else { +1351 if (callback != null) { +1352 try { +1353 //noinspection unchecked +1354 // TODO: would callback expect a replica region name if it gets one? +1355 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result); +1356 } catch (Throwable t) { +1357 LOG.error("User callback threw an exception for " +1358 + Bytes.toStringBinary(regionName) + ", ignoring", t); +1359 } +1360 } +1361 setResult(sentAction, result); +1362 } 1363 } -1364 -1365 if (failureCount == 0) { -1366 errorsByServer.reportServerError(server); -1367 canRetry = errorsByServer.canTryMore(numAttempt); -1368 } -1369 if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) { -1370 // For multi-actions, we don't have a table name, but we want to make sure to clear the -1371 // cache in case there were location-related exceptions. We don't to clear the cache -1372 // for every possible exception that comes through, however. -1373 connection.clearCaches(server); -1374 } else { -1375 connection.updateCachedLocations( -1376 tableName, region, actions.get(0).getAction().getRow(), throwable, server); -1377 } -1378 failureCount += actions.size(); -1379 -1380 for (Action<Row> action : actions) { -1381 Row row = action.getAction(); -1382 Retry retry = manageError(action.getOriginalIndex(), row, -1383 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); -1384 if (retry == Retry.YES) { -1385 toReplay.add(action); -1386 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { -1387 ++stopped; -1388 } else { -1389 ++failed; -1390 } -1391 } -1392 } -1393 if (toReplay.isEmpty()) { -1394 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); -1395 } else { -1396 resubmit(server, toReplay, numAttempt, failureCount, throwable); -1397 } -1398 } -1399 -1400 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, -1401 Throwable error, long backOffTime, boolean willRetry, String startTime, -1402 int failed, int stopped) { -1403 StringBuilder sb = new StringBuilder(); -1404 sb.append("#").append(id).append(", table=").append(tableName).append(", ") -1405 .append("attempt=").append(numAttempt) -1406 .append("/").append(numTries).append(" "); -1407 -1408 if (failureCount > 0 || error != null){ -1409 sb.append("failed=").append(failureCount).append("ops").append(", last exception: "). -1410 append(error == null ? "null" : error); -1411 } else { -1412 sb.append("succeeded"); -1413 } -1414 -1415 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); -1416 -1417 if (willRetry) { -1418 sb.append(", retrying after=").append(backOffTime).append("ms"). -1419 append(", replay=").append(replaySize).append("ops"); -1420 } else if (failureCount > 0) { -1421 if (stopped > 0) { -1422 sb.append("; not retrying ").append(stopped).append(" due to success from other replica"); -1423 } -1424 if (failed > 0) { -1425 sb.append("; not retrying ").append(failed).append(" - final failure"); -1426 } -1427 -1428 } -1429 -1430 return sb.toString(); -1431 } +1364 } +1365 +1366 // The failures global to a region. We will use for multiAction we sent previously to find the +1367 // actions to replay. +1368 for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) { +1369 throwable = throwableEntry.getValue(); +1370 byte[] region = throwableEntry.getKey(); +1371 List<Action<Row>> actions = multiAction.actions.get(region); +1372 if (actions == null || actions.isEmpty()) { +1373 throw new IllegalStateException("Wrong response for the region: " + +1374 HRegionInfo.encodeRegionName(region)); +1375 } +1376 +1377 if (failureCount == 0) { +1378 errorsByServer.reportServerError(server); +1379 canRetry = errorsByServer.canTryMore(numAttempt); +1380 } +1381 if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) { +1382 // For multi-actions, we don't have a table name, but we want to make sure to clear the +1383 // cache in case there were location-related exceptions. We don't to clear the cache +1384 // for every possible exception that comes through, however. +1385 connection.clearCaches(server); +1386 } else { +1387 try { +1388 connection.updateCachedLocations( +1389 tableName, region, actions.get(0).getAction().getRow(), throwable, server); +1390 } catch (Throwable ex) { +1391 // That should never happen, but if it did, we want to make sure +1392 // we still process errors +1393 LOG.error("Couldn't update cached region locations: " + ex); +1394 } +1395 } +1396 failureCount += actions.size(); +1397 +1398 for (Action<Row> action : actions) { +1399 Row row = action.getAction(); +1400 Retry retry = manageError(action.getOriginalIndex(), row, +1401 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); +1402 if (retry == Retry.YES) { +1403 toReplay.add(action); +1404 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { +1405 ++stopped; +1406 } else { +1407 ++failed; +1408 } +1409 } +1410 } +1411 if (toReplay.isEmpty()) { +1412 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); +1413 } else { +1414 resubmit(server, toReplay, numAttempt, failureCount, throwable); +1415 } +1416 } +1417 +1418 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, +1419 Throwable error, long backOffTime, boolean willRetry, String startTime, +1420 int failed, int stopped) { +1421 StringBuilder sb = new StringBuilder(); +1422 sb.append("#").append(id).append(", table=").append(tableName).append(", ") +1423 .append("attempt=").append(numAttempt) +1424 .append("/").append(numTries).append(" "); +1425 +1426 if (failureCount > 0 || error != null){ +1427 sb.append("failed=").append(failureCount).append("ops").append(", last exception: "). +1428 append(error == null ? "null" : error); +1429 } else { +1430 sb.append("succeeded"); +1431 } 1432 -1433 /** -1434 * Sets the non-error result from a particular action. -1435 * @param action Action (request) that the server responded to. -1436 * @param result The result. -1437 */ -1438 private void setResult(Action<Row> action, Object result) { -1439 if (result == null) { -1440 throw new RuntimeException("Result cannot be null"); -1441 } -1442 ReplicaResultState state = null; -1443 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); -1444 int index = action.getOriginalIndex(); -1445 if (results == null) { -1446 decActionCounter(index); -1447 return; // Simple case, no replica requests. -1448 } -1449 state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); -1450 if (state == null) { -1451 return; // Simple case, no replica requests. -1452 } -1453 // At this point we know that state is set to replica tracking class. -1454 // It could be that someone else is also looking at it; however, we know there can -1455 // only be one state object, and only one thread can set callCount to 0. Other threads -1456 // will either see state with callCount 0 after locking it; or will not see state at all -1457 // we will replace it with the result. -1458 synchronized (state) { -1459 if (state.callCount == 0) { -1460 return; // someone already set the result -1461 } -1462 state.callCount = 0; -1463 } -1464 synchronized (replicaResultLock) { -1465 if (results[index] != state) { -1466 throw new AssertionError("We set the callCount but someone else replaced the result"); -1467 } -1468 results[index] = result; -1469 } -1470 -1471 decActionCounter(index); -1472 } -1473 -1474 /** -1475 * Sets the error from a particular action. -1476 * @param index Original action index. -1477 * @param row Original request. -1478 * @param throwable The resulting error. -1479 * @param server The source server. -1480 */ -1481 private void setError(int index, Row row, Throwable throwable, ServerName server) { -1482 ReplicaResultState state = null; -1483 if (results == null) { -1484 // Note that we currently cannot have replica requests with null results. So it shouldn't -1485 // happen that multiple replica calls will call dAC for same actions with results == null. -1486 // Only one call per action should be present in this case. -1487 errors.add(throwable, row, server); -1488 decActionCounter(index); -1489 return; // Simple case, no replica requests. -1490 } -1491 state = trySetResultSimple(index, row, true, throwable, server, false); -1492 if (state == null) { -1493 return; // Simple case, no replica requests. -1494 } -1495 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. -1496 boolean isActionDone = false; -1497 synchronized (state) { -1498 switch (state.callCount) { -1499 case 0: return; // someone already set the result -1500 case 1: { // All calls failed, we are the last error. -1501 target = errors; -1502 isActionDone = true; -1503 break; -1504 } -1505 default: { -1506 assert state.callCount > 1; -1507 if (state.replicaErrors == null) { -1508 state.replicaErrors = new BatchErrors(); -1509 } -1510 target = state.replicaErrors; -1511 break; -1512 } -1513 } -1514 --state.callCount; -1515 } -1516 target.add(throwable, row, server); -1517 if (isActionDone) { -1518 if (state.replicaErrors != null) { // last call, no need to lock -1519 errors.merge(state.replicaErrors); -1520 } -1521 // See setResult for explanations. -1522 synchronized (replicaResultLock) { -1523 if (results[index] != state) { -1524 throw new AssertionError("We set the callCount but someone else replaced the result"); -1525 } -1526 results[index] = throwable; -1527 } -1528 decActionCounter(index); -1529 } -1530 } -1531 -1532 /** -1533 * Checks if the action is complete; used on error to prevent needless retries. -1534 * Does not synchronize, assuming element index/field accesses are atomic. -1535 * This is an opportunistic optimization check, doesn't have to be strict. -1536 * @param index Original action index. -1537 * @param row Original request. -1538 */ -1539 private boolean isActionComplete(int index, Row row) { -1540 if (!isReplicaGet(row)) return false; -1541 Object resObj = results[index]; -1542 return (resObj != null) && (!(resObj instanceof ReplicaResultState) -1543 || ((ReplicaResultState)resObj).callCount == 0); -1544 } -1545 -1546 /** -1547 * Tries to set the result or error for a particular action as if there were no replica calls. -1548 * @return null if successful; replica state if there were in fact replica calls. -1549 */ -1550 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, -1551 Object result, ServerName server, boolean isFromReplica) { -1552 Object resObj = null; -1553 if (!isReplicaGet(row)) { -1554 if (isFromReplica) { -1555 throw new AssertionError("Unexpected stale result for " + row); -1556 } -1557 results[index] = result; -1558 } else { -1559 synchronized (replicaResultLock) { -1560 resObj = results[index]; -1561 if (resObj == null) { -1562 if (isFromReplica) { -1563 throw new AssertionError("Unexpected stale result for " + row); -1564 } -1565 results[index] = result; -1566 } -1567 } -1568 } -1569 -1570 ReplicaResultState rrs = -1571 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; -1572 if (rrs == null && isError) { -1573 // The resObj is not replica state (null or already set). -1574 errors.add((Throwable)result, row, server); -1575 } -1576 -1577 if (resObj == null) { -1578 // resObj is null - no replica calls were made. -1579 decActionCounter(index); -1580 return null; -1581 } -1582 return rrs; -1583 } -1584 -1585 private void decActionCounter(int index) { -1586 long actionsRemaining = actionsInProgress.decrementAndGet(); -1587 if (actionsRemaining < 0) { -1588 String error = buildDetailedErrorMsg("Incorrect actions in progress", index); -1589 throw new AssertionError(error); -1590 } else if (actionsRemaining == 0) { -1591 synchronized (actionsInProgress) { -1592 actionsInProgress.notifyAll(); -1593 } -1594 } -1595 } -1596 -1597 private String buildDetailedErrorMsg(String string, int index) { -1598 StringBuilder error = new StringBuilder(128); -1599 error.append(string).append("; called for ").append(index).append(", actionsInProgress ") -1600 .append(actionsInProgress.get()).append("; replica gets: "); -1601 if (replicaGetIndices != null) { -1602 for (int i = 0; i < replicaGetIndices.length; ++i) { -1603 error.append(replicaGetIndices[i]).append(", "); -1604 } -1605 } else { -1606 error.append(hasAnyReplicaGets ? "all" : "none"); -1607 } -1608 error.append("; results "); -1609 if (results != null) { -1610 for (int i = 0; i < results.length; ++i) { -1611 Object o = results[i]; -1612 error.append(((o == null) ? "null" : o.toString())).append(", "); -1613 } -1614 } -1615 return error.toString(); -1616 } -1617 -1618 @Override -1619 public void waitUntilDone() throws InterruptedIOException { -1620 try { -1621 waitUntilDone(Long.MAX_VALUE); -1622 } catch (InterruptedException iex) { -1623 throw new InterruptedIOException(iex.getMessage()); -1624 } finally { -1625 if (callsInProgress != null) { -1626 for (PayloadCarryingServerCallable clb : callsInProgress) { -1627 clb.cancel(); -1628 } -1629 } -1630 } -1631 } -1632 -1633 private boolean waitUntilDone(long cutoff) throws InterruptedException { -1634 boolean hasWait = cutoff != Long.MAX_VALUE; -1635 long lastLog = EnvironmentEdgeManager.currentTime(); -1636 long currentInProgress; -1637 while (0 != (currentInProgress = actionsInProgress.get())) { -1638 long now = EnvironmentEdgeManager.currentTime(); -1639 if (hasWait && (now * 1000L) > cutoff) { -1640 return false; -1641 } -1642 if (!hasWait) { // Only log if wait is infinite. -1643 if (now > lastLog + 10000) { -1644 lastLog = now; -1645 LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish"); +1433 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); +1434 +1435 if (willRetry) { +1436 sb.append(", retrying after=").append(backOffTime).append("ms"). +1437 append(", replay=").append(replaySize).append("ops"); +1438 } else if (failureCount > 0) { +1439 if (stopped > 0) { +1440 sb.append("; not retrying ").append(stopped).append(" due to success from other replica"); +1441 } +1442 if (failed > 0) { +1443 sb.append("; not retrying ").append(failed).append(" - final failure"); +1444 } +1445 +1446 } +1447 +1448 return sb.toString(); +1449 } +1450 +1451 /** +1452 * Sets the non-error result from a particular action. +1453 * @param action Action (request) that the server responded to. +1454 * @param result The result. +1455 */ +1456 private void setResult(Action<Row> action, Object result) { +1457 if (result == null) { +1458 throw new RuntimeException("Result cannot be null"); +1459 } +1460 ReplicaResultState state = null; +1461 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); +1462 int index = action.getOriginalIndex(); +1463 if (results == null) { +1464 decActionCounter(index); +1465 return; // Simple case, no replica requests. +1466 } +1467 state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); +1468 if (state == null) { +1469 return; // Simple case, no replica requests. +1470 } +1471 // At this point we know that state is set to replica tracking class. +1472 // It could be that someone else is also looking at it; however, we know there can +1473 // only be one state object, and only one thread can set callCount to 0. Other threads +1474 // will either see state with callCount 0 after locking it; or will not see state at all +1475 // we will replace it with the result. +1476 synchronized (state) { +1477 if (state.callCount == 0) { +1478 return; // someone already set the result +1479 } +1480 state.callCount = 0; +1481 } +1482 synchronized (replicaResultLock) { +1483 if (results[index] != state) { +1484 throw new AssertionError("We set the callCount but someone else replaced the result"); +1485 } +1486 results[index] = result; +1487 } +1488 +1489 decActionCounter(index); +1490 } +1491 +1492 /** +1493 * Sets the error from a particular action. +1494 * @param index Original action index. +1495 * @param row Original request. +1496 * @param throwable The resulting error. +1497 * @param server The source server. +1498 */ +1499 private void setError(int index, Row row, Throwable throwable, ServerName server) { +1500 ReplicaResultState state = null; +1501 if (results == null) { +1502 // Note that we currently cannot have replica requests with null results. So it shouldn't +1503 // happen that multiple replica calls will call dAC for same actions with results == null. +1504 // Only one call per action should be present in this case. +1505 errors.add(throwable, row, server); +1506 decActionCounter(index); +1507 return; // Simple case, no replica requests. +1508 } +1509 state = trySetResultSimple(index, row, true, throwable, server, false); +1510 if (state == null) { +1511 return; // Simple case, no replica requests. +1512 } +1513 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. +1514 boolean isActionDone = false; +1515 synchronized (state) { +1516 switch (state.callCount) { +1517 case 0: return; // someone already set the result +1518 case 1: { // All calls failed, we are the last error. +1519 target = errors; +1520 isActionDone = true; +1521 break; +1522 } +1523 default: { +1524 assert state.callCount > 1; +1525 if (state.replicaErrors == null) { +1526 state.replicaErrors = new BatchErrors(); +1527 } +1528 target = state.replicaErrors; +1529 break; +1530 } +1531 } +1532 --state.callCount; +1533 } +1534 target.add(throwable, row, server); +1535 if (isActionDone) { +1536 if (state.replicaErrors != null) { // last call, no need to lock +1537 errors.merge(state.replicaErrors); +1538 } +1539 // See setResult for explanations. +1540 synchronized (replicaResultLock) { +1541 if (results[index] != state) { +1542 throw new AssertionError("We set the callCount but someone else replaced the result"); +1543 } +1544 results[index] = throwable; +1545 } +1546 decActionCounter(index); +1547 } +1548 } +1549 +1550 /** +1551 * Checks if the action is complete; used on error to prevent needless retries. +1552 * Does not synchronize, assuming element index/field accesses are atomic. +1553 * This is an opportunistic optimization check, doesn't have to be strict. +1554 * @param index Original action index. +1555 * @param row Original request. +1556 */ +1557 private boolean isActionComplete(int index, Row row) { +1558 if (!isReplicaGet(row)) return false; +1559 Object resObj = results[index]; +1560 return (resObj != null) && (!(resObj instanceof ReplicaResultState) +1561 || ((ReplicaResultState)resObj).callCount == 0); +1562 } +1563 +1564 /** +1565 * Tries to set the result or error for a particular action as if there were no replica calls. +1566 * @return null if successful; replica state if there were in fact replica calls. +1567 */ +1568 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, +1569 Object result, ServerName server, boolean isFromReplica) { +1570 Object resObj = null; +1571 if (!isReplicaGet(row)) { +1572 if (isFromReplica) { +1573 throw new AssertionError("Unexpected stale result for " + row); +1574 } +1575 results[index] = result; +1576 } else { +1577 synchronized (replicaResultLock) { +1578 resObj = results[index]; +1579 if (resObj == null) { +1580 if (isFromReplica) { +1581 throw new AssertionError("Unexpected stale result for " + row); +1582 } +1583 results[index] = result; +1584 } +1585 } +1586 } +1587 +1588 ReplicaResultState rrs = +1589