Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C307018D87 for ; Tue, 23 Feb 2016 17:08:17 +0000 (UTC) Received: (qmail 25820 invoked by uid 500); 23 Feb 2016 17:08:11 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 25700 invoked by uid 500); 23 Feb 2016 17:08:11 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 23803 invoked by uid 99); 23 Feb 2016 17:08:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Feb 2016 17:08:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6ABACE8E7F; Tue, 23 Feb 2016 17:08:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Tue, 23 Feb 2016 17:08:39 -0000 Message-Id: <29f1eb931fee475da25775ef94f4912b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/51] [partial] hbase-site git commit: Published site at 58283fa1b1b10beec62cefa40babff6a1424b06c. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/d02dd5db/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.ReplicaCallIssuingRunnable.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.ReplicaCallIssuingRunnable.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.ReplicaCallIssuingRunnable.html index 3180076..d813e71 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.ReplicaCallIssuingRunnable.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.ReplicaCallIssuingRunnable.html @@ -1368,474 +1368,481 @@ 1360 errorsByServer.reportServerError(server); 1361 canRetry = errorsByServer.canTryMore(numAttempt); 1362 } -1363 connection.updateCachedLocations( -1364 tableName, region, actions.get(0).getAction().getRow(), throwable, server); -1365 failureCount += actions.size(); -1366 -1367 for (Action<Row> action : actions) { -1368 Row row = action.getAction(); -1369 Retry retry = manageError(action.getOriginalIndex(), row, -1370 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); -1371 if (retry == Retry.YES) { -1372 toReplay.add(action); -1373 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { -1374 ++stopped; -1375 } else { -1376 ++failed; -1377 } -1378 } -1379 } -1380 -1381 if (toReplay.isEmpty()) { -1382 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); -1383 } else { -1384 resubmit(server, toReplay, numAttempt, failureCount, throwable); -1385 } -1386 } +1363 if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) { +1364 // For multi-actions, we don't have a table name, but we want to make sure to clear the +1365 // cache in case there were location-related exceptions. We don't to clear the cache +1366 // for every possible exception that comes through, however. +1367 connection.clearCaches(server); +1368 } else { +1369 connection.updateCachedLocations( +1370 tableName, region, actions.get(0).getAction().getRow(), throwable, server); +1371 } +1372 failureCount += actions.size(); +1373 +1374 for (Action<Row> action : actions) { +1375 Row row = action.getAction(); +1376 Retry retry = manageError(action.getOriginalIndex(), row, +1377 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); +1378 if (retry == Retry.YES) { +1379 toReplay.add(action); +1380 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { +1381 ++stopped; +1382 } else { +1383 ++failed; +1384 } +1385 } +1386 } 1387 -1388 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, -1389 Throwable error, long backOffTime, boolean willRetry, String startTime, -1390 int failed, int stopped) { -1391 StringBuilder sb = new StringBuilder(); -1392 sb.append("#").append(id).append(", table=").append(tableName).append(", ") -1393 .append("attempt=").append(numAttempt) -1394 .append("/").append(numTries).append(" "); -1395 -1396 if (failureCount > 0 || error != null){ -1397 sb.append("failed=").append(failureCount).append("ops").append(", last exception: "). -1398 append(error == null ? "null" : error); -1399 } else { -1400 sb.append("succeeded"); -1401 } +1388 if (toReplay.isEmpty()) { +1389 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); +1390 } else { +1391 resubmit(server, toReplay, numAttempt, failureCount, throwable); +1392 } +1393 } +1394 +1395 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, +1396 Throwable error, long backOffTime, boolean willRetry, String startTime, +1397 int failed, int stopped) { +1398 StringBuilder sb = new StringBuilder(); +1399 sb.append("#").append(id).append(", table=").append(tableName).append(", ") +1400 .append("attempt=").append(numAttempt) +1401 .append("/").append(numTries).append(" "); 1402 -1403 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); -1404 -1405 if (willRetry) { -1406 sb.append(", retrying after=").append(backOffTime).append("ms"). -1407 append(", replay=").append(replaySize).append("ops"); -1408 } else if (failureCount > 0) { -1409 if (stopped > 0) { -1410 sb.append("; not retrying ").append(stopped).append(" due to success from other replica"); -1411 } -1412 if (failed > 0) { -1413 sb.append("; not retrying ").append(failed).append(" - final failure"); -1414 } -1415 -1416 } -1417 -1418 return sb.toString(); -1419 } -1420 -1421 /** -1422 * Sets the non-error result from a particular action. -1423 * @param action Action (request) that the server responded to. -1424 * @param result The result. -1425 */ -1426 private void setResult(Action<Row> action, Object result) { -1427 if (result == null) { -1428 throw new RuntimeException("Result cannot be null"); -1429 } -1430 ReplicaResultState state = null; -1431 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); -1432 int index = action.getOriginalIndex(); -1433 if (results == null) { -1434 decActionCounter(index); -1435 return; // Simple case, no replica requests. +1403 if (failureCount > 0 || error != null){ +1404 sb.append("failed=").append(failureCount).append("ops").append(", last exception: "). +1405 append(error == null ? "null" : error); +1406 } else { +1407 sb.append("succeeded"); +1408 } +1409 +1410 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); +1411 +1412 if (willRetry) { +1413 sb.append(", retrying after=").append(backOffTime).append("ms"). +1414 append(", replay=").append(replaySize).append("ops"); +1415 } else if (failureCount > 0) { +1416 if (stopped > 0) { +1417 sb.append("; not retrying ").append(stopped).append(" due to success from other replica"); +1418 } +1419 if (failed > 0) { +1420 sb.append("; not retrying ").append(failed).append(" - final failure"); +1421 } +1422 +1423 } +1424 +1425 return sb.toString(); +1426 } +1427 +1428 /** +1429 * Sets the non-error result from a particular action. +1430 * @param action Action (request) that the server responded to. +1431 * @param result The result. +1432 */ +1433 private void setResult(Action<Row> action, Object result) { +1434 if (result == null) { +1435 throw new RuntimeException("Result cannot be null"); 1436 } -1437 state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); -1438 if (state == null) { -1439 return; // Simple case, no replica requests. -1440 } -1441 // At this point we know that state is set to replica tracking class. -1442 // It could be that someone else is also looking at it; however, we know there can -1443 // only be one state object, and only one thread can set callCount to 0. Other threads -1444 // will either see state with callCount 0 after locking it; or will not see state at all -1445 // we will replace it with the result. -1446 synchronized (state) { -1447 if (state.callCount == 0) { -1448 return; // someone already set the result -1449 } -1450 state.callCount = 0; -1451 } -1452 synchronized (replicaResultLock) { -1453 if (results[index] != state) { -1454 throw new AssertionError("We set the callCount but someone else replaced the result"); -1455 } -1456 results[index] = result; -1457 } -1458 -1459 decActionCounter(index); -1460 } -1461 -1462 /** -1463 * Sets the error from a particular action. -1464 * @param index Original action index. -1465 * @param row Original request. -1466 * @param throwable The resulting error. -1467 * @param server The source server. -1468 */ -1469 private void setError(int index, Row row, Throwable throwable, ServerName server) { -1470 ReplicaResultState state = null; -1471 if (results == null) { -1472 // Note that we currently cannot have replica requests with null results. So it shouldn't -1473 // happen that multiple replica calls will call dAC for same actions with results == null. -1474 // Only one call per action should be present in this case. -1475 errors.add(throwable, row, server); -1476 decActionCounter(index); -1477 return; // Simple case, no replica requests. -1478 } -1479 state = trySetResultSimple(index, row, true, throwable, server, false); -1480 if (state == null) { -1481 return; // Simple case, no replica requests. -1482 } -1483 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. -1484 boolean isActionDone = false; -1485 synchronized (state) { -1486 switch (state.callCount) { -1487 case 0: return; // someone already set the result -1488 case 1: { // All calls failed, we are the last error. -1489 target = errors; -1490 isActionDone = true; -1491 break; -1492 } -1493 default: { -1494 assert state.callCount > 1; -1495 if (state.replicaErrors == null) { -1496 state.replicaErrors = new BatchErrors(); -1497 } -1498 target = state.replicaErrors; -1499 break; -1500 } -1501 } -1502 --state.callCount; -1503 } -1504 target.add(throwable, row, server); -1505 if (isActionDone) { -1506 if (state.replicaErrors != null) { // last call, no need to lock -1507 errors.merge(state.replicaErrors); +1437 ReplicaResultState state = null; +1438 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); +1439 int index = action.getOriginalIndex(); +1440 if (results == null) { +1441 decActionCounter(index); +1442 return; // Simple case, no replica requests. +1443 } +1444 state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); +1445 if (state == null) { +1446 return; // Simple case, no replica requests. +1447 } +1448 // At this point we know that state is set to replica tracking class. +1449 // It could be that someone else is also looking at it; however, we know there can +1450 // only be one state object, and only one thread can set callCount to 0. Other threads +1451 // will either see state with callCount 0 after locking it; or will not see state at all +1452 // we will replace it with the result. +1453 synchronized (state) { +1454 if (state.callCount == 0) { +1455 return; // someone already set the result +1456 } +1457 state.callCount = 0; +1458 } +1459 synchronized (replicaResultLock) { +1460 if (results[index] != state) { +1461 throw new AssertionError("We set the callCount but someone else replaced the result"); +1462 } +1463 results[index] = result; +1464 } +1465 +1466 decActionCounter(index); +1467 } +1468 +1469 /** +1470 * Sets the error from a particular action. +1471 * @param index Original action index. +1472 * @param row Original request. +1473 * @param throwable The resulting error. +1474 * @param server The source server. +1475 */ +1476 private void setError(int index, Row row, Throwable throwable, ServerName server) { +1477 ReplicaResultState state = null; +1478 if (results == null) { +1479 // Note that we currently cannot have replica requests with null results. So it shouldn't +1480 // happen that multiple replica calls will call dAC for same actions with results == null. +1481 // Only one call per action should be present in this case. +1482 errors.add(throwable, row, server); +1483 decActionCounter(index); +1484 return; // Simple case, no replica requests. +1485 } +1486 state = trySetResultSimple(index, row, true, throwable, server, false); +1487 if (state == null) { +1488 return; // Simple case, no replica requests. +1489 } +1490 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. +1491 boolean isActionDone = false; +1492 synchronized (state) { +1493 switch (state.callCount) { +1494 case 0: return; // someone already set the result +1495 case 1: { // All calls failed, we are the last error. +1496 target = errors; +1497 isActionDone = true; +1498 break; +1499 } +1500 default: { +1501 assert state.callCount > 1; +1502 if (state.replicaErrors == null) { +1503 state.replicaErrors = new BatchErrors(); +1504 } +1505 target = state.replicaErrors; +1506 break; +1507 } 1508 } -1509 // See setResult for explanations. -1510 synchronized (replicaResultLock) { -1511 if (results[index] != state) { -1512 throw new AssertionError("We set the callCount but someone else replaced the result"); -1513 } -1514 results[index] = throwable; +1509 --state.callCount; +1510 } +1511 target.add(throwable, row, server); +1512 if (isActionDone) { +1513 if (state.replicaErrors != null) { // last call, no need to lock +1514 errors.merge(state.replicaErrors); 1515 } -1516 decActionCounter(index); -1517 } -1518 } -1519 -1520 /** -1521 * Checks if the action is complete; used on error to prevent needless retries. -1522 * Does not synchronize, assuming element index/field accesses are atomic. -1523 * This is an opportunistic optimization check, doesn't have to be strict. -1524 * @param index Original action index. -1525 * @param row Original request. -1526 */ -1527 private boolean isActionComplete(int index, Row row) { -1528 if (!isReplicaGet(row)) return false; -1529 Object resObj = results[index]; -1530 return (resObj != null) && (!(resObj instanceof ReplicaResultState) -1531 || ((ReplicaResultState)resObj).callCount == 0); -1532 } -1533 -1534 /** -1535 * Tries to set the result or error for a particular action as if there were no replica calls. -1536 * @return null if successful; replica state if there were in fact replica calls. -1537 */ -1538 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, -1539 Object result, ServerName server, boolean isFromReplica) { -1540 Object resObj = null; -1541 if (!isReplicaGet(row)) { -1542 if (isFromReplica) { -1543 throw new AssertionError("Unexpected stale result for " + row); -1544 } -1545 results[index] = result; -1546 } else { -1547 synchronized (replicaResultLock) { -1548 resObj = results[index]; -1549 if (resObj == null) { -1550 if (isFromReplica) { -1551 throw new AssertionError("Unexpected stale result for " + row); -1552 } -1553 results[index] = result; -1554 } -1555 } -1556 } -1557 -1558 ReplicaResultState rrs = -1559 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; -1560 if (rrs == null && isError) { -1561 // The resObj is not replica state (null or already set). -1562 errors.add((Throwable)result, row, server); +1516 // See setResult for explanations. +1517 synchronized (replicaResultLock) { +1518 if (results[index] != state) { +1519 throw new AssertionError("We set the callCount but someone else replaced the result"); +1520 } +1521 results[index] = throwable; +1522 } +1523 decActionCounter(index); +1524 } +1525 } +1526 +1527 /** +1528 * Checks if the action is complete; used on error to prevent needless retries. +1529 * Does not synchronize, assuming element index/field accesses are atomic. +1530 * This is an opportunistic optimization check, doesn't have to be strict. +1531 * @param index Original action index. +1532 * @param row Original request. +1533 */ +1534 private boolean isActionComplete(int index, Row row) { +1535 if (!isReplicaGet(row)) return false; +1536 Object resObj = results[index]; +1537 return (resObj != null) && (!(resObj instanceof ReplicaResultState) +1538 || ((ReplicaResultState)resObj).callCount == 0); +1539 } +1540 +1541 /** +1542 * Tries to set the result or error for a particular action as if there were no replica calls. +1543 * @return null if successful; replica state if there were in fact replica calls. +1544 */ +1545 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, +1546 Object result, ServerName server, boolean isFromReplica) { +1547 Object resObj = null; +1548 if (!isReplicaGet(row)) { +1549 if (isFromReplica) { +1550 throw new AssertionError("Unexpected stale result for " + row); +1551 } +1552 results[index] = result; +1553 } else { +1554 synchronized (replicaResultLock) { +1555 resObj = results[index]; +1556 if (resObj == null) { +1557 if (isFromReplica) { +1558 throw new AssertionError("Unexpected stale result for " + row); +1559 } +1560 results[index] = result; +1561 } +1562 } 1563 } 1564 -1565 if (resObj == null) { -1566 // resObj is null - no replica calls were made. -1567 decActionCounter(index); -1568 return null; -1569 } -1570 return rrs; -1571 } -1572 -1573 private void decActionCounter(int index) { -1574 long actionsRemaining = actionsInProgress.decrementAndGet(); -1575 if (actionsRemaining < 0) { -1576 String error = buildDetailedErrorMsg("Incorrect actions in progress", index); -1577 throw new AssertionError(error); -1578 } else if (actionsRemaining == 0) { -1579 synchronized (actionsInProgress) { -1580 actionsInProgress.notifyAll(); -1581 } -1582 } -1583 } -1584 -1585 private String buildDetailedErrorMsg(String string, int index) { -1586 StringBuilder error = new StringBuilder(128); -1587 error.append(string).append("; called for ").append(index).append(", actionsInProgress ") -1588 .append(actionsInProgress.get()).append("; replica gets: "); -1589 if (replicaGetIndices != null) { -1590 for (int i = 0; i < replicaGetIndices.length; ++i) { -1591 error.append(replicaGetIndices[i]).append(", "); -1592 } -1593 } else { -1594 error.append(hasAnyReplicaGets ? "all" : "none"); -1595 } -1596 error.append("; results "); -1597 if (results != null) { -1598 for (int i = 0; i < results.length; ++i) { -1599 Object o = results[i]; -1600 error.append(((o == null) ? "null" : o.toString())).append(", "); -1601 } +1565 ReplicaResultState rrs = +1566 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; +1567 if (rrs == null && isError) { +1568 // The resObj is not replica state (null or already set). +1569 errors.add((Throwable)result, row, server); +1570 } +1571 +1572 if (resObj == null) { +1573 // resObj is null - no replica calls were made. +1574 decActionCounter(index); +1575 return null; +1576 } +1577 return rrs; +1578 } +1579 +1580 private void decActionCounter(int index) { +1581 long actionsRemaining = actionsInProgress.decrementAndGet(); +1582 if (actionsRemaining < 0) { +1583 String error = buildDetailedErrorMsg("Incorrect actions in progress", index); +1584 throw new AssertionError(error); +1585 } else if (actionsRemaining == 0) { +1586 synchronized (actionsInProgress) { +1587 actionsInProgress.notifyAll(); +1588 } +1589 } +1590 } +1591 +1592 private String buildDetailedErrorMsg(String string, int index) { +1593 StringBuilder error = new StringBuilder(128); +1594 error.append(string).append("; called for ").append(index).append(", actionsInProgress ") +1595 .append(actionsInProgress.get()).append("; replica gets: "); +1596 if (replicaGetIndices != null) { +1597 for (int i = 0; i < replicaGetIndices.length; ++i) { +1598 error.append(replicaGetIndices[i]).append(", "); +1599 } +1600 } else { +1601 error.append(hasAnyReplicaGets ? "all" : "none"); 1602 } -1603 return error.toString(); -1604 } -1605 -1606 @Override -1607 public void waitUntilDone() throws InterruptedIOException { -1608 try { -1609 waitUntilDone(Long.MAX_VALUE); -1610 } catch (InterruptedException iex) { -1611 throw new InterruptedIOException(iex.getMessage()); -1612 } finally { -1613 if (callsInProgress != null) { -1614 for (MultiServerCallable<Row> clb : callsInProgress) { -1615 clb.cancel(); -1616 } -1617 } -1618 } -1619 } -1620 -1621 private boolean waitUntilDone(long cutoff) throws InterruptedException { -1622 boolean hasWait = cutoff != Long.MAX_VALUE; -1623 long lastLog = EnvironmentEdgeManager.currentTime(); -1624 long currentInProgress; -1625 while (0 != (currentInProgress = actionsInProgress.get())) { -1626 long now = EnvironmentEdgeManager.currentTime(); -1627 if (hasWait && (now * 1000L) > cutoff) { -1628 return false; -1629 } -1630 if (!hasWait) { // Only log if wait is infinite. -1631 if (now > lastLog + 10000) { -1632 lastLog = now; -1633 LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish"); -1634 } -1635 } -1636 synchronized (actionsInProgress) { -1637 if (actionsInProgress.get() == 0) break; -1638 if (!hasWait) { -1639 actionsInProgress.wait(100); -1640 } else { -1641 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); -1642 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); -1643 } -1644 } -1645 } -1646 return true; -1647 } -1648 -1649 @Override -1650 public boolean hasError() { -1651 return errors.hasErrors(); -1652 } -1653 -1654 @Override -1655 public List<? extends Row> getFailedOperations() { -1656 return errors.actions; -1657 } -1658 -1659 @Override -1660 public RetriesExhaustedWithDetailsException getErrors() { -1661 return errors.makeException(); -1662 } -1663 -1664 @Override -1665 public Object[] getResults() throws InterruptedIOException { -1666 waitUntilDone(); -1667 return results; -1668 } -1669 } +1603 error.append("; results "); +1604 if (results != null) { +1605 for (int i = 0; i < results.length; ++i) { +1606 Object o = results[i]; +1607 error.append(((o == null) ? "null" : o.toString())).append(", "); +1608 } +1609 } +1610 return error.toString(); +1611 } +1612 +1613 @Override +1614 public void waitUntilDone() throws InterruptedIOException { +1615 try { +1616 waitUntilDone(Long.MAX_VALUE); +1617 } catch (InterruptedException iex) { +1618 throw new InterruptedIOException(iex.getMessage()); +1619 } finally { +1620 if (callsInProgress != null) { +1621 for (MultiServerCallable<Row> clb : callsInProgress) { +1622 clb.cancel(); +1623 } +1624 } +1625 } +1626 } +1627 +1628 private boolean waitUntilDone(long cutoff) throws InterruptedException { +1629 boolean hasWait = cutoff != Long.MAX_VALUE; +1630 long lastLog = EnvironmentEdgeManager.currentTime(); +1631 long currentInProgress; +1632 while (0 != (currentInProgress = actionsInProgress.get())) { +1633 long now = EnvironmentEdgeManager.currentTime(); +1634 if (hasWait && (now * 1000L) > cutoff) { +1635 return false; +1636 } +1637 if (!hasWait) { // Only log if wait is infinite. +1638 if (now > lastLog + 10000) { +1639 lastLog = now; +1640 LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish"); +1641 } +1642 } +1643 synchronized (actionsInProgress) { +1644 if (actionsInProgress.get() == 0) break; +1645 if (!hasWait) { +1646 actionsInProgress.wait(100); +1647 } else { +1648 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); +1649 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); +1650 } +1651 } +1652 } +1653 return true; +1654 } +1655 +1656 @Override +1657 public boolean hasError() { +1658 return errors.hasErrors(); +1659 } +1660 +1661 @Override +1662 public List<? extends Row> getFailedOperations() { +1663 return errors.actions; +1664 } +1665 +1666 @Override +1667 public RetriesExhaustedWithDetailsException getErrors() { +1668 return errors.makeException(); +1669 } 1670 -1671 @VisibleForTesting -1672 /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ -1673 protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( -1674 TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, -1675 Batch.Callback<CResult> callback, Object[] results, boolean needResults) { -1676 return new AsyncRequestFutureImpl<CResult>( -1677 tableName, actions, nonceGroup, getPool(pool), needResults, results, callback); -1678 } -1679 -1680 /** -1681 * Create a callable. Isolated to be easily overridden in the tests. -1682 */ -1683 @VisibleForTesting -1684 protected MultiServerCallable<Row> createCallable(final ServerName server, -1685 TableName tableName, final MultiAction<Row> multi) { -1686 return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi); -1687 } -1688 -1689 /** -1690 * Create a caller. Isolated to be easily overridden in the tests. -1691 */ -1692 @VisibleForTesting -1693 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { -1694 return rpcCallerFactory.<MultiResponse> newCaller(); -1695 } -1696 -1697 @VisibleForTesting -1698 /** Waits until all outstanding tasks are done. Used in tests. */ -1699 void waitUntilDone() throws InterruptedIOException { -1700 waitForMaximumCurrentTasks(0); -1701 } -1702 -1703 /** Wait until the async does not have more than max tasks in progress. */ -1704 private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { -1705 long lastLog = EnvironmentEdgeManager.currentTime(); -1706 long currentInProgress, oldInProgress = Long.MAX_VALUE; -1707 while ((currentInProgress = this.tasksInProgress.get()) > max) { -1708 if (oldInProgress != currentInProgress) { // Wait for in progress to change. -1709 long now = EnvironmentEdgeManager.currentTime(); -1710 if (now > lastLog + 10000) { -1711 lastLog = now; -1712 LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" -1713 + max + ", tasksInProgress=" + currentInProgress); -1714 } -1715 } -1716 oldInProgress = currentInProgress; -1717 try { -1718 synchronized (this.tasksInProgress) { -1719 if (tasksInProgress.get() != oldInProgress) break; -1720 this.tasksInProgress.wait(100); +1671 @Override +1672 public Object[] getResults() throws InterruptedIOException { +1673 waitUntilDone(); +1674 return results; +1675 } +1676 } +1677 +1678 @VisibleForTesting +1679 /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ +1680 protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( +1681 TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, +1682 Batch.Callback<CResult> callback, Object[] results, boolean needResults) { +1683 return new AsyncRequestFutureImpl<CResult>( +1684 tableName, actions, nonceGroup, getPool(pool), needResults, results, callback); +1685 } +1686 +1687 /** +1688 * Create a callable. Isolated to be easily overridden in the tests. +1689 */ +1690 @VisibleForTesting +1691 protected MultiServerCallable<Row> createCallable(final ServerName server, +1692 TableName tableName, final MultiAction<Row> multi) { +1693 return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi); +1694 } +1695 +1696 /** +1697 * Create a caller. Isolated to be easily overridden in the tests. +1698 */ +1699 @VisibleForTesting +1700 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { +1701 return rpcCallerFactory.<MultiResponse> newCaller(); +1702 } +1703 +1704 @VisibleForTesting +1705 /** Waits until all outstanding tasks are done. Used in tests. */ +1706 void waitUntilDone() throws InterruptedIOException { +1707 waitForMaximumCurrentTasks(0); +1708 } +1709 +1710 /** Wait until the async does not have more than max tasks in progress. */ +1711 private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { +1712 long lastLog = EnvironmentEdgeManager.currentTime(); +1713 long currentInProgress, oldInProgress = Long.MAX_VALUE; +1714 while ((currentInProgress = this.tasksInProgress.get()) > max) { +1715 if (oldInProgress != currentInProgress) { // Wait for in progress to change. +1716 long now = EnvironmentEdgeManager.currentTime(); +1717 if (now > lastLog + 10000) { +1718 lastLog = now; +1719 LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" +1720 + max + ", tasksInProgress=" + currentInProgress); 1721 } -1722 } catch (InterruptedException e) { -1723 throw new InterruptedIOException("#" + id + ", interrupted." + -1724 " currentNumberOfTask=" + currentInProgress); -1725 } -1726 } -1727 } -1728 -1729 /** -1730 * Only used w/useGlobalErrors ctor argument, for HTable backward compat. -1731 * @return Whether there were any errors in any request since the last time -1732 * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created. -1733 */ -1734 public boolean hasError() { -1735 return globalErrors.hasErrors(); -1736 } -1737 -1738 /** -1739 * Only used w/useGlobalErrors ctor argument, for HTable backward compat. -1740 * Waits for all previous operations to finish, and returns errors and (optionally) -1741 * failed operations themselves. -1742 * @param failedRows an optional list into which the rows that failed since the last time -1743 * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved. -1744 * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)} -1745 * was called, or AP was created. -1746 */ -1747 public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( -1748 List<Row> failedRows) throws InterruptedIOException { -1749 waitForMaximumCurrentTasks(0); -1750 if (!globalErrors.hasErrors()) { -1751 return null; -1752 } -1753 if (failedRows != null) { -1754 failedRows.addAll(globalErrors.actions); -1755 } -1756 RetriesExhaustedWithDetailsException result = globalErrors.makeException(); -1757 globalErrors.clear(); -1758 return result; -1759 } -1760 -1761 /** -1762 * increment the tasks counters for a given set of regions. MT safe. -1763 */ -1764 protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) { -1765 tasksInProgress.incrementAndGet(); -1766 -1767 AtomicInteger serverCnt = taskCounterPerServer.get(sn); -1768 if (serverCnt == null) { -1769 taskCounterPerServer.putIfAbsent(sn, new AtomicInteger()); -1770 serverCnt = taskCounterPerServer.get(sn); -1771 } -1772 serverCnt.incrementAndGet(); +1722 } +1723 oldInProgress = currentInProgress; +1724 try { +1725 synchronized (this.tasksInProgress) { +1726 if (tasksInProgress.get() != oldInProgress) break; +1727 this.tasksInProgress.wait(100); +1728 } +1729 } catch (InterruptedException e) { +1730 throw new InterruptedIOException("#" + id + ", interrupted." + +1731 " currentNumberOfTask=" + currentInProgress); +1732 } +1733 } +1734 } +1735 +1736 /** +1737 * Only used w/useGlobalErrors ctor argument, for HTable backward compat. +1738 * @return Whether there were any errors in any request since the last time +1739 * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created. +1740 */ +1741 public boolean hasError() { +1742 return globalErrors.hasErrors(); +1743 } +1744 +1745 /** +1746 * Only used w/useGlobalErrors ctor argument, for HTable backward compat. +1747 * Waits for all previous operations to finish, and returns errors and (optionally) +1748 * failed operations themselves. +1749 * @param failedRows an optional list into which the rows that failed since the last time +1750 * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved. +1751 * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)} +1752 * was called, or AP was created. +1753 */ +1754 public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( +1755 List<Row> failedRows) throws InterruptedIOException { +1756 waitForMaximumCurrentTasks(0); +1757 if (!globalErrors.hasErrors()) { +1758 return null; +1759 } +1760 if (failedRows != null) { +1761 failedRows.addAll(globalErrors.actions); +1762 } +1763 RetriesExhaustedWithDetailsException result = globalErrors.makeException(); +1764 globalErrors.clear(); +1765 return result; +1766 } +1767 +1768 /** +1769 * increment the tasks counters for a given set of regions. MT safe. +1770 */ +1771 protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) { +1772 tasksInProgress.incrementAndGet(); 1773 -1774 for (byte[] regBytes : regions) { -1775 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); -1776 if (regionCnt == null) { -1777 regionCnt = new AtomicInteger(); -1778 AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt); -1779 if (oldCnt != null) { -1780 regionCnt = oldCnt; -1781 } -1782 } -1783 regionCnt.incrementAndGet(); -1784 } -1785 } -1786 -1787 /** -1788 * Decrements the counters for a given region and the region server. MT Safe. -1789 */ -1790 protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) { -1791 for (byte[] regBytes : regions) { -1792 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); -1793 regionCnt.decrementAndGet(); -1794 } -1795 -1796 taskCounterPerServer.get(sn).decrementAndGet(); -1797 tasksInProgress.decrementAndGet(); -1798 synchronized (tasksInProgress) { -1799 tasksInProgress.notifyAll(); -1800 } -1801 } +1774 AtomicInteger serverCnt = taskCounterPerServer.get(sn); +1775 if (serverCnt == null) { +1776 taskCounterPerServer.putIfAbsent(sn, new AtomicInteger()); +1777 serverCnt = taskCounterPerServer.get(sn); +1778 } +1779 serverCnt.incrementAndGet(); +1780 +1781 for (byte[] regBytes : regions) { +1782 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); +1783 if (regionCnt == null) { +1784 regionCnt = new AtomicInteger(); +1785 AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt); +1786 if (oldCnt != null) { +1787 regionCnt = oldCnt; +1788 } +1789 } +1790 regionCnt.incrementAndGet(); +1791 } +1792 } +1793 +1794 /** +1795 * Decrements the counters for a given region and the region server. MT Safe. +1796 */ +1797 protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) { +1798 for (byte[] regBytes : regions) { +1799 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); +1800 regionCnt.decrementAndGet(); +1801 } 1802 -1803 /** -1804 * Creates the server error tracker to use inside process. -1805 * Currently, to preserve the main assumption about current retries, and to work well with -1806 * the retry-limit-based calculation, the calculation is local per Process object. -1807 * We may benefit from connection-wide tracking of server errors. -1808 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection -1809 */ -1810 protected ConnectionImplementation.Ser