Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 48E3D200CE6 for ; Tue, 1 Aug 2017 17:07:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4709B16747A; Tue, 1 Aug 2017 15:07:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2B5A2167413 for ; Tue, 1 Aug 2017 17:07:51 +0200 (CEST) Received: (qmail 96263 invoked by uid 500); 1 Aug 2017 15:07:48 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 93972 invoked by uid 99); 1 Aug 2017 15:07:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Aug 2017 15:07:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A4E92F3342; Tue, 1 Aug 2017 15:07:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 01 Aug 2017 15:08:06 -0000 Message-Id: <1797cac1b0a24acfaaf0a99ff5428db3@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [24/43] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Tue, 01 Aug 2017 15:07:53 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/110df817/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html index 596b800..681f621 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html @@ -1456,364 +1456,366 @@ 1448 synchronized (regionNode) { 1449 State state = regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN); 1450 if (isMetaRegion(hri)) { -1451 setMetaInitialized(hri, true); -1452 } -1453 regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); -1454 // TODO: OPENING Updates hbase:meta too... we need to do both here and there? -1455 // That is a lot of hbase:meta writing. -1456 regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, -1457 regionNode.getRegionLocation(), regionNode.getLastHost(), regionNode.getOpenSeqNum(), -1458 regionNode.getProcedure().getProcId()); -1459 sendRegionOpenedNotification(hri, regionNode.getRegionLocation()); -1460 } -1461 } -1462 -1463 public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException { -1464 final HRegionInfo hri = regionNode.getRegionInfo(); -1465 synchronized (regionNode) { -1466 State state = regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE); -1467 // Set meta has not initialized early. so people trying to create/edit tables will wait -1468 if (isMetaRegion(hri)) { -1469 setMetaInitialized(hri, false); -1470 } -1471 regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); -1472 regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, -1473 regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM, -1474 regionNode.getProcedure().getProcId()); -1475 } -1476 -1477 // update the operation count metrics -1478 metrics.incrementOperationCounter(); -1479 } -1480 -1481 public void undoRegionAsClosing(final RegionStateNode regionNode) { -1482 // TODO: Metrics. Do opposite of metrics.incrementOperationCounter(); -1483 // There is nothing to undo? -1484 } -1485 -1486 public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException { -1487 final HRegionInfo hri = regionNode.getRegionInfo(); -1488 synchronized (regionNode) { -1489 State state = regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE); -1490 regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode); -1491 regionNode.setLastHost(regionNode.getRegionLocation()); -1492 regionNode.setRegionLocation(null); -1493 regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, -1494 regionNode.getRegionLocation()/*null*/, regionNode.getLastHost(), -1495 HConstants.NO_SEQNUM, regionNode.getProcedure().getProcId()); -1496 sendRegionClosedNotification(hri); -1497 } -1498 } -1499 -1500 public void markRegionAsSplit(final HRegionInfo parent, final ServerName serverName, -1501 final HRegionInfo daughterA, final HRegionInfo daughterB) -1502 throws IOException { -1503 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. -1504 // The parent stays in regionStates until cleared when removed by CatalogJanitor. -1505 // Update its state in regionStates to it shows as offline and split when read -1506 // later figuring what regions are in a table and what are not: see -1507 // regionStates#getRegionsOfTable -1508 final RegionStateNode node = regionStates.getOrCreateRegionNode(parent); -1509 node.setState(State.SPLIT); -1510 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); -1511 if (shouldAssignFavoredNodes(parent)) { -1512 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); -1513 ((FavoredNodesPromoter)getBalancer()). -1514 generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB); -1515 } -1516 } -1517 -1518 /** -1519 * When called here, the merge has happened. The two merged regions have been -1520 * unassigned and the above markRegionClosed has been called on each so they have been -1521 * disassociated from a hosting Server. The merged region will be open after this call. The -1522 * merged regions are removed from hbase:meta below> Later they are deleted from the filesystem -1523 * by the catalog janitor running against hbase:meta. It notices when the merged region no -1524 * longer holds references to the old regions. -1525 */ -1526 public void markRegionAsMerged(final HRegionInfo child, final ServerName serverName, -1527 final HRegionInfo mother, final HRegionInfo father) throws IOException { -1528 final RegionStateNode node = regionStates.getOrCreateRegionNode(child); -1529 node.setState(State.MERGED); -1530 regionStates.deleteRegion(mother); -1531 regionStates.deleteRegion(father); -1532 regionStateStore.mergeRegions(child, mother, father, serverName); -1533 if (shouldAssignFavoredNodes(child)) { -1534 ((FavoredNodesPromoter)getBalancer()). -1535 generateFavoredNodesForMergedRegion(child, mother, father); -1536 } -1537 } -1538 -1539 /* -1540 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region -1541 * belongs to a non-system table. -1542 */ -1543 private boolean shouldAssignFavoredNodes(HRegionInfo region) { -1544 return this.shouldAssignRegionsWithFavoredNodes && -1545 FavoredNodesManager.isFavoredNodeApplicable(region); -1546 } -1547 -1548 // ============================================================================================ -1549 // Assign Queue (Assign/Balance) +1451 master.getTableStateManager().setTableState(TableName.META_TABLE_NAME, +1452 TableState.State.ENABLED); +1453 setMetaInitialized(hri, true); +1454 } +1455 regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); +1456 // TODO: OPENING Updates hbase:meta too... we need to do both here and there? +1457 // That is a lot of hbase:meta writing. +1458 regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, +1459 regionNode.getRegionLocation(), regionNode.getLastHost(), regionNode.getOpenSeqNum(), +1460 regionNode.getProcedure().getProcId()); +1461 sendRegionOpenedNotification(hri, regionNode.getRegionLocation()); +1462 } +1463 } +1464 +1465 public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException { +1466 final HRegionInfo hri = regionNode.getRegionInfo(); +1467 synchronized (regionNode) { +1468 State state = regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE); +1469 // Set meta has not initialized early. so people trying to create/edit tables will wait +1470 if (isMetaRegion(hri)) { +1471 setMetaInitialized(hri, false); +1472 } +1473 regionStates.addRegionToServer(regionNode.getRegionLocation(), regionNode); +1474 regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, +1475 regionNode.getRegionLocation(), regionNode.getLastHost(), HConstants.NO_SEQNUM, +1476 regionNode.getProcedure().getProcId()); +1477 } +1478 +1479 // update the operation count metrics +1480 metrics.incrementOperationCounter(); +1481 } +1482 +1483 public void undoRegionAsClosing(final RegionStateNode regionNode) { +1484 // TODO: Metrics. Do opposite of metrics.incrementOperationCounter(); +1485 // There is nothing to undo? +1486 } +1487 +1488 public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException { +1489 final HRegionInfo hri = regionNode.getRegionInfo(); +1490 synchronized (regionNode) { +1491 State state = regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE); +1492 regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode); +1493 regionNode.setLastHost(regionNode.getRegionLocation()); +1494 regionNode.setRegionLocation(null); +1495 regionStateStore.updateRegionLocation(regionNode.getRegionInfo(), state, +1496 regionNode.getRegionLocation()/*null*/, regionNode.getLastHost(), +1497 HConstants.NO_SEQNUM, regionNode.getProcedure().getProcId()); +1498 sendRegionClosedNotification(hri); +1499 } +1500 } +1501 +1502 public void markRegionAsSplit(final HRegionInfo parent, final ServerName serverName, +1503 final HRegionInfo daughterA, final HRegionInfo daughterB) +1504 throws IOException { +1505 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. +1506 // The parent stays in regionStates until cleared when removed by CatalogJanitor. +1507 // Update its state in regionStates to it shows as offline and split when read +1508 // later figuring what regions are in a table and what are not: see +1509 // regionStates#getRegionsOfTable +1510 final RegionStateNode node = regionStates.getOrCreateRegionNode(parent); +1511 node.setState(State.SPLIT); +1512 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); +1513 if (shouldAssignFavoredNodes(parent)) { +1514 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); +1515 ((FavoredNodesPromoter)getBalancer()). +1516 generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB); +1517 } +1518 } +1519 +1520 /** +1521 * When called here, the merge has happened. The two merged regions have been +1522 * unassigned and the above markRegionClosed has been called on each so they have been +1523 * disassociated from a hosting Server. The merged region will be open after this call. The +1524 * merged regions are removed from hbase:meta below> Later they are deleted from the filesystem +1525 * by the catalog janitor running against hbase:meta. It notices when the merged region no +1526 * longer holds references to the old regions. +1527 */ +1528 public void markRegionAsMerged(final HRegionInfo child, final ServerName serverName, +1529 final HRegionInfo mother, final HRegionInfo father) throws IOException { +1530 final RegionStateNode node = regionStates.getOrCreateRegionNode(child); +1531 node.setState(State.MERGED); +1532 regionStates.deleteRegion(mother); +1533 regionStates.deleteRegion(father); +1534 regionStateStore.mergeRegions(child, mother, father, serverName); +1535 if (shouldAssignFavoredNodes(child)) { +1536 ((FavoredNodesPromoter)getBalancer()). +1537 generateFavoredNodesForMergedRegion(child, mother, father); +1538 } +1539 } +1540 +1541 /* +1542 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region +1543 * belongs to a non-system table. +1544 */ +1545 private boolean shouldAssignFavoredNodes(HRegionInfo region) { +1546 return this.shouldAssignRegionsWithFavoredNodes && +1547 FavoredNodesManager.isFavoredNodeApplicable(region); +1548 } +1549 1550 // ============================================================================================ -1551 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); -1552 private final ReentrantLock assignQueueLock = new ReentrantLock(); -1553 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); -1554 -1555 /** -1556 * Add the assign operation to the assignment queue. -1557 * The pending assignment operation will be processed, -1558 * and each region will be assigned by a server using the balancer. -1559 */ -1560 protected void queueAssign(final RegionStateNode regionNode) { -1561 getProcedureScheduler().suspendEvent(regionNode.getProcedureEvent()); -1562 -1563 // TODO: quick-start for meta and the other sys-tables? -1564 assignQueueLock.lock(); -1565 try { -1566 pendingAssignQueue.add(regionNode); -1567 if (regionNode.isSystemTable() || -1568 pendingAssignQueue.size() == 1 || -1569 pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) { -1570 assignQueueFullCond.signal(); -1571 } -1572 } finally { -1573 assignQueueLock.unlock(); -1574 } -1575 } -1576 -1577 private void startAssignmentThread() { -1578 assignThread = new Thread("AssignmentThread") { -1579 @Override -1580 public void run() { -1581 while (isRunning()) { -1582 processAssignQueue(); -1583 } -1584 pendingAssignQueue.clear(); -1585 } -1586 }; -1587 assignThread.start(); -1588 } -1589 -1590 private void stopAssignmentThread() { -1591 assignQueueSignal(); -1592 try { -1593 while (assignThread.isAlive()) { -1594 assignQueueSignal(); -1595 assignThread.join(250); -1596 } -1597 } catch (InterruptedException e) { -1598 LOG.warn("join interrupted", e); -1599 Thread.currentThread().interrupt(); -1600 } -1601 } -1602 -1603 private void assignQueueSignal() { -1604 assignQueueLock.lock(); -1605 try { -1606 assignQueueFullCond.signal(); -1607 } finally { -1608 assignQueueLock.unlock(); -1609 } -1610 } -1611 -1612 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") -1613 private HashMap<HRegionInfo, RegionStateNode> waitOnAssignQueue() { -1614 HashMap<HRegionInfo, RegionStateNode> regions = null; -1615 -1616 assignQueueLock.lock(); -1617 try { -1618 if (pendingAssignQueue.isEmpty() && isRunning()) { -1619 assignQueueFullCond.await(); -1620 } -1621 -1622 if (!isRunning()) return null; -1623 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); -1624 regions = new HashMap<HRegionInfo, RegionStateNode>(pendingAssignQueue.size()); -1625 for (RegionStateNode regionNode: pendingAssignQueue) { -1626 regions.put(regionNode.getRegionInfo(), regionNode); -1627 } -1628 pendingAssignQueue.clear(); -1629 } catch (InterruptedException e) { -1630 LOG.warn("got interrupted ", e); -1631 Thread.currentThread().interrupt(); -1632 } finally { -1633 assignQueueLock.unlock(); -1634 } -1635 return regions; -1636 } -1637 -1638 private void processAssignQueue() { -1639 final HashMap<HRegionInfo, RegionStateNode> regions = waitOnAssignQueue(); -1640 if (regions == null || regions.size() == 0 || !isRunning()) { -1641 return; -1642 } -1643 -1644 if (LOG.isTraceEnabled()) { -1645 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); -1646 } -1647 -1648 // TODO: Optimize balancer. pass a RegionPlan? -1649 final HashMap<HRegionInfo, ServerName> retainMap = new HashMap<HRegionInfo, ServerName>(); -1650 final List<HRegionInfo> rrList = new ArrayList<HRegionInfo>(); -1651 for (RegionStateNode regionNode: regions.values()) { -1652 if (regionNode.getRegionLocation() != null) { -1653 retainMap.put(regionNode.getRegionInfo(), regionNode.getRegionLocation()); -1654 } else { -1655 rrList.add(regionNode.getRegionInfo()); -1656 } -1657 } -1658 -1659 // TODO: connect with the listener to invalidate the cache -1660 final LoadBalancer balancer = getBalancer(); -1661 -1662 // TODO use events -1663 List<ServerName> servers = master.getServerManager().createDestinationServersList(); -1664 for (int i = 0; servers.size() < 1; ++i) { -1665 if (i % 4 == 0) { -1666 LOG.warn("no server available, unable to find a location for " + regions.size() + -1667 " unassigned regions. waiting"); -1668 } -1669 -1670 // the was AM killed -1671 if (!isRunning()) { -1672 LOG.debug("aborting assignment-queue with " + regions.size() + " not assigned"); -1673 return; -1674 } -1675 -1676 Threads.sleep(250); -1677 servers = master.getServerManager().createDestinationServersList(); -1678 } -1679 -1680 final boolean isTraceEnabled = LOG.isTraceEnabled(); -1681 if (isTraceEnabled) { -1682 LOG.trace("available servers count=" + servers.size() + ": " + servers); -1683 } -1684 -1685 // ask the balancer where to place regions -1686 if (!retainMap.isEmpty()) { -1687 if (isTraceEnabled) { -1688 LOG.trace("retain assign regions=" + retainMap); -1689 } -1690 try { -1691 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); -1692 } catch (HBaseIOException e) { -1693 LOG.warn("unable to retain assignment", e); -1694 addToPendingAssignment(regions, retainMap.keySet()); -1695 } -1696 } -1697 -1698 // TODO: Do we need to split retain and round-robin? -1699 // the retain seems to fallback to round-robin/random if the region is not in the map. -1700 if (!rrList.isEmpty()) { -1701 Collections.sort(rrList); -1702 if (isTraceEnabled) { -1703 LOG.trace("round robin regions=" + rrList); -1704 } -1705 try { -1706 acceptPlan(regions, balancer.roundRobinAssignment(rrList, servers)); -1707 } catch (HBaseIOException e) { -1708 LOG.warn("unable to round-robin assignment", e); -1709 addToPendingAssignment(regions, rrList); -1710 } -1711 } -1712 } -1713 -1714 private void acceptPlan(final HashMap<HRegionInfo, RegionStateNode> regions, -1715 final Map<ServerName, List<HRegionInfo>> plan) throws HBaseIOException { -1716 final ProcedureEvent[] events = new ProcedureEvent[regions.size()]; -1717 final long st = System.currentTimeMillis(); -1718 -1719 if (plan == null) { -1720 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); -1721 } -1722 -1723 if (plan.isEmpty()) return; +1551 // Assign Queue (Assign/Balance) +1552 // ============================================================================================ +1553 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); +1554 private final ReentrantLock assignQueueLock = new ReentrantLock(); +1555 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); +1556 +1557 /** +1558 * Add the assign operation to the assignment queue. +1559 * The pending assignment operation will be processed, +1560 * and each region will be assigned by a server using the balancer. +1561 */ +1562 protected void queueAssign(final RegionStateNode regionNode) { +1563 getProcedureScheduler().suspendEvent(regionNode.getProcedureEvent()); +1564 +1565 // TODO: quick-start for meta and the other sys-tables? +1566 assignQueueLock.lock(); +1567 try { +1568 pendingAssignQueue.add(regionNode); +1569 if (regionNode.isSystemTable() || +1570 pendingAssignQueue.size() == 1 || +1571 pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) { +1572 assignQueueFullCond.signal(); +1573 } +1574 } finally { +1575 assignQueueLock.unlock(); +1576 } +1577 } +1578 +1579 private void startAssignmentThread() { +1580 assignThread = new Thread("AssignmentThread") { +1581 @Override +1582 public void run() { +1583 while (isRunning()) { +1584 processAssignQueue(); +1585 } +1586 pendingAssignQueue.clear(); +1587 } +1588 }; +1589 assignThread.start(); +1590 } +1591 +1592 private void stopAssignmentThread() { +1593 assignQueueSignal(); +1594 try { +1595 while (assignThread.isAlive()) { +1596 assignQueueSignal(); +1597 assignThread.join(250); +1598 } +1599 } catch (InterruptedException e) { +1600 LOG.warn("join interrupted", e); +1601 Thread.currentThread().interrupt(); +1602 } +1603 } +1604 +1605 private void assignQueueSignal() { +1606 assignQueueLock.lock(); +1607 try { +1608 assignQueueFullCond.signal(); +1609 } finally { +1610 assignQueueLock.unlock(); +1611 } +1612 } +1613 +1614 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") +1615 private HashMap<HRegionInfo, RegionStateNode> waitOnAssignQueue() { +1616 HashMap<HRegionInfo, RegionStateNode> regions = null; +1617 +1618 assignQueueLock.lock(); +1619 try { +1620 if (pendingAssignQueue.isEmpty() && isRunning()) { +1621 assignQueueFullCond.await(); +1622 } +1623 +1624 if (!isRunning()) return null; +1625 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); +1626 regions = new HashMap<HRegionInfo, RegionStateNode>(pendingAssignQueue.size()); +1627 for (RegionStateNode regionNode: pendingAssignQueue) { +1628 regions.put(regionNode.getRegionInfo(), regionNode); +1629 } +1630 pendingAssignQueue.clear(); +1631 } catch (InterruptedException e) { +1632 LOG.warn("got interrupted ", e); +1633 Thread.currentThread().interrupt(); +1634 } finally { +1635 assignQueueLock.unlock(); +1636 } +1637 return regions; +1638 } +1639 +1640 private void processAssignQueue() { +1641 final HashMap<HRegionInfo, RegionStateNode> regions = waitOnAssignQueue(); +1642 if (regions == null || regions.size() == 0 || !isRunning()) { +1643 return; +1644 } +1645 +1646 if (LOG.isTraceEnabled()) { +1647 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); +1648 } +1649 +1650 // TODO: Optimize balancer. pass a RegionPlan? +1651 final HashMap<HRegionInfo, ServerName> retainMap = new HashMap<HRegionInfo, ServerName>(); +1652 final List<HRegionInfo> rrList = new ArrayList<HRegionInfo>(); +1653 for (RegionStateNode regionNode: regions.values()) { +1654 if (regionNode.getRegionLocation() != null) { +1655 retainMap.put(regionNode.getRegionInfo(), regionNode.getRegionLocation()); +1656 } else { +1657 rrList.add(regionNode.getRegionInfo()); +1658 } +1659 } +1660 +1661 // TODO: connect with the listener to invalidate the cache +1662 final LoadBalancer balancer = getBalancer(); +1663 +1664 // TODO use events +1665 List<ServerName> servers = master.getServerManager().createDestinationServersList(); +1666 for (int i = 0; servers.size() < 1; ++i) { +1667 if (i % 4 == 0) { +1668 LOG.warn("no server available, unable to find a location for " + regions.size() + +1669 " unassigned regions. waiting"); +1670 } +1671 +1672 // the was AM killed +1673 if (!isRunning()) { +1674 LOG.debug("aborting assignment-queue with " + regions.size() + " not assigned"); +1675 return; +1676 } +1677 +1678 Threads.sleep(250); +1679 servers = master.getServerManager().createDestinationServersList(); +1680 } +1681 +1682 final boolean isTraceEnabled = LOG.isTraceEnabled(); +1683 if (isTraceEnabled) { +1684 LOG.trace("available servers count=" + servers.size() + ": " + servers); +1685 } +1686 +1687 // ask the balancer where to place regions +1688 if (!retainMap.isEmpty()) { +1689 if (isTraceEnabled) { +1690 LOG.trace("retain assign regions=" + retainMap); +1691 } +1692 try { +1693 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); +1694 } catch (HBaseIOException e) { +1695 LOG.warn("unable to retain assignment", e); +1696 addToPendingAssignment(regions, retainMap.keySet()); +1697 } +1698 } +1699 +1700 // TODO: Do we need to split retain and round-robin? +1701 // the retain seems to fallback to round-robin/random if the region is not in the map. +1702 if (!rrList.isEmpty()) { +1703 Collections.sort(rrList); +1704 if (isTraceEnabled) { +1705 LOG.trace("round robin regions=" + rrList); +1706 } +1707 try { +1708 acceptPlan(regions, balancer.roundRobinAssignment(rrList, servers)); +1709 } catch (HBaseIOException e) { +1710 LOG.warn("unable to round-robin assignment", e); +1711 addToPendingAssignment(regions, rrList); +1712 } +1713 } +1714 } +1715 +1716 private void acceptPlan(final HashMap<HRegionInfo, RegionStateNode> regions, +1717 final Map<ServerName, List<HRegionInfo>> plan) throws HBaseIOException { +1718 final ProcedureEvent[] events = new ProcedureEvent[regions.size()]; +1719 final long st = System.currentTimeMillis(); +1720 +1721 if (plan == null) { +1722 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); +1723 } 1724 -1725 int evcount = 0; -1726 for (Map.Entry<ServerName, List<HRegionInfo>> entry: plan.entrySet()) { -1727 final ServerName server = entry.getKey(); -1728 for (HRegionInfo hri: entry.getValue()) { -1729 final RegionStateNode regionNode = regions.get(hri); -1730 regionNode.setRegionLocation(server); -1731 events[evcount++] = regionNode.getProcedureEvent(); -1732 } -1733 } -1734 getProcedureScheduler().wakeEvents(evcount, events); -1735 -1736 final long et = System.currentTimeMillis(); -1737 if (LOG.isTraceEnabled()) { -1738 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + -1739 StringUtils.humanTimeDiff(et - st)); -1740 } -1741 } -1742 -1743 private void addToPendingAssignment(final HashMap<HRegionInfo, RegionStateNode> regions, -1744 final Collection<HRegionInfo> pendingRegions) { -1745 assignQueueLock.lock(); -1746 try { -1747 for (HRegionInfo hri: pendingRegions) { -1748 pendingAssignQueue.add(regions.get(hri)); -1749 } -1750 } finally { -1751 assignQueueLock.unlock(); -1752 } -1753 } -1754 -1755 /** -1756 * Get a list of servers that this region can not assign to. -1757 * For system table, we must assign them to a server with highest version. -1758 */ -1759 public List<ServerName> getExcludedServersForSystemTable() { -1760 List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList() -1761 .stream() -1762 .map((s)->new Pair<>(s, master.getRegionServerVersion(s))) -1763 .collect(Collectors.toList()); -1764 if (serverList.isEmpty()) { -1765 return new ArrayList<>(); -1766 } -1767 String highestVersion = Collections.max(serverList, -1768 (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); -1769 return serverList.stream() -1770 .filter((p)->!p.getSecond().equals(highestVersion)) -1771 .map(Pair::getFirst) -1772 .collect(Collectors.toList()); -1773 } -1774 -1775 // ============================================================================================ -1776 // Server Helpers +1725 if (plan.isEmpty()) return; +1726 +1727 int evcount = 0; +1728 for (Map.Entry<ServerName, List<HRegionInfo>> entry: plan.entrySet()) { +1729 final ServerName server = entry.getKey(); +1730 for (HRegionInfo hri: entry.getValue()) { +1731 final RegionStateNode regionNode = regions.get(hri); +1732 regionNode.setRegionLocation(server); +1733 events[evcount++] = regionNode.getProcedureEvent(); +1734 } +1735 } +1736 getProcedureScheduler().wakeEvents(evcount, events); +1737 +1738 final long et = System.currentTimeMillis(); +1739 if (LOG.isTraceEnabled()) { +1740 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + +1741 StringUtils.humanTimeDiff(et - st)); +1742 } +1743 } +1744 +1745 private void addToPendingAssignment(final HashMap<HRegionInfo, RegionStateNode> regions, +1746 final Collection<HRegionInfo> pendingRegions) { +1747 assignQueueLock.lock(); +1748 try { +1749 for (HRegionInfo hri: pendingRegions) { +1750 pendingAssignQueue.add(regions.get(hri)); +1751 } +1752 } finally { +1753 assignQueueLock.unlock(); +1754 } +1755 } +1756 +1757 /** +1758 * Get a list of servers that this region can not assign to. +1759 * For system table, we must assign them to a server with highest version. +1760 */ +1761 public List<ServerName> getExcludedServersForSystemTable() { +1762 List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList() +1763 .stream() +1764 .map((s)->new Pair<>(s, master.getRegionServerVersion(s))) +1765 .collect(Collectors.toList()); +1766 if (serverList.isEmpty()) { +1767 return new ArrayList<>(); +1768 } +1769 String highestVersion = Collections.max(serverList, +1770 (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); +1771 return serverList.stream() +1772 .filter((p)->!p.getSecond().equals(highestVersion)) +1773 .map(Pair::getFirst) +1774 .collect(Collectors.toList()); +1775 } +1776 1777 // ============================================================================================ -1778 @Override -1779 public void serverAdded(final ServerName serverName) { -1780 } -1781 -1782 @Override -1783 public void serverRemoved(final ServerName serverName) { -1784 final ServerStateNode serverNode = regionStates.getServerNode(serverName); -1785 if (serverNode == null) return; -1786 -1787 // just in case, wake procedures waiting for this server report -1788 wakeServerReportEvent(serverNode); -1789 } -1790 -1791 public int getServerVersion(final ServerName serverName) { -1792 final ServerStateNode node = regionStates.getServerNode(serverName); -1793 return node != null ? node.getVersionNumber() : 0; -1794 } -1795 -1796 public void killRegionServer(final ServerName serverName) { -1797 final ServerStateNode serverNode = regionStates.getServerNode(serverName); -1798 killRegionServer(serverNode); -1799 } -1800 -1801 public void killRegionServer(final ServerStateNode serverNode) { -1802 /** Don't do this. Messes up accounting. Let ServerCrashProcedure do this. -1803 for (RegionStateNode regionNode: serverNode.getRegions()) { -1804 regionNode.offline(); -1805 }*/ -1806 master.getServerManager().expireServer(serverNode.getServerName()); -1807 } -1808} +1778 // Server Helpers +1779 // ============================================================================================ +1780 @Override +1781 public void serverAdded(final ServerName serverName) { +1782 } +1783 +1784 @Override +1785 public void serverRemoved(final ServerName serverName) { +1786 final ServerStateNode serverNode = regionStates.getServerNode(serverName); +1787 if (serverNode == null) return; +1788 +1789 // just in case, wake procedures waiting for this server report +1790 wakeServerReportEvent(serverNode); +1791 } +1792 +1793 public int getServerVersion(final ServerName serverName) { +1794 final ServerStateNode node = regionStates.getServerNode(serverName); +1795 return node != null ? node.getVersionNumber() : 0; +1796 } +1797 +1798 public void killRegionServer(final ServerName serverName) { +1799 final ServerStateNode serverNode = regionStates.getServerNode(serverName); +1800 killRegionServer(serverNode); +1801 } +1802 +1803 public void killRegionServer(final ServerStateNode serverNode) { +1804 /** Don't do this. Messes up accounting. Let ServerCrashProcedure do this. +1805 for (RegionStateNode regionNode: serverNode.getRegions()) { +1806 regionNode.offline(); +1807 }*/ +1808 master.getServerManager().expireServer(serverNode.getServerName()); +1809 } +1810}