Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B4240200D52 for ; Sat, 2 Dec 2017 16:17:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B2FA8160C23; Sat, 2 Dec 2017 15:17:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D1F6E160C24 for ; Sat, 2 Dec 2017 16:17:53 +0100 (CET) Received: (qmail 95990 invoked by uid 500); 2 Dec 2017 15:17:51 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 95515 invoked by uid 99); 2 Dec 2017 15:17:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Dec 2017 15:17:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0441F60D3; Sat, 2 Dec 2017 15:17:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sat, 02 Dec 2017 15:18:03 -0000 Message-Id: <7901e355d32741ae82fc1c4bd806808a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/32] hbase-site git commit: Published site at . archived-at: Sat, 02 Dec 2017 15:17:56 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/29b27596/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegionServer.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegionServer.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegionServer.html index 269105b..1198d94 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegionServer.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegionServer.html @@ -1524,2213 +1524,2214 @@ 1516 // This call sets up an initialized replication and WAL. Later we start it up. 1517 setupWALAndReplication(); 1518 // Init in here rather than in constructor after thread name has been set -1519 this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this)); -1520 this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); -1521 // Now that we have a metrics source, start the pause monitor -1522 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); -1523 pauseMonitor.start(); -1524 -1525 // There is a rare case where we do NOT want services to start. Check config. -1526 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { -1527 startServices(); -1528 } -1529 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. -1530 // or make sense of it. -1531 startReplicationService(); -1532 +1519 this.metricsRegionServer = new MetricsRegionServer( +1520 new MetricsRegionServerWrapperImpl(this), conf); +1521 this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); +1522 // Now that we have a metrics source, start the pause monitor +1523 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); +1524 pauseMonitor.start(); +1525 +1526 // There is a rare case where we do NOT want services to start. Check config. +1527 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { +1528 startServices(); +1529 } +1530 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. +1531 // or make sense of it. +1532 startReplicationService(); 1533 -1534 // Set up ZK -1535 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + -1536 ", sessionid=0x" + -1537 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); -1538 -1539 // Wake up anyone waiting for this server to online -1540 synchronized (online) { -1541 online.set(true); -1542 online.notifyAll(); -1543 } -1544 } catch (Throwable e) { -1545 stop("Failed initialization"); -1546 throw convertThrowableToIOE(cleanup(e, "Failed init"), -1547 "Region server startup failed"); -1548 } finally { -1549 sleeper.skipSleepCycle(); -1550 } -1551 } -1552 -1553 protected void initializeMemStoreChunkCreator() { -1554 if (MemStoreLAB.isEnabled(conf)) { -1555 // MSLAB is enabled. So initialize MemStoreChunkPool -1556 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from -1557 // it. -1558 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); -1559 long globalMemStoreSize = pair.getFirst(); -1560 boolean offheap = this.regionServerAccounting.isOffheap(); -1561 // When off heap memstore in use, take full area for chunk pool. -1562 float poolSizePercentage = offheap? 1.0F: -1563 conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); -1564 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, -1565 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); -1566 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); -1567 // init the chunkCreator -1568 ChunkCreator chunkCreator = -1569 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, -1570 initialCountPercentage, this.hMemManager); -1571 } -1572 } -1573 -1574 private void startHeapMemoryManager() { -1575 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this, -1576 this.regionServerAccounting); -1577 if (this.hMemManager != null) { -1578 this.hMemManager.start(getChoreService()); -1579 } -1580 } -1581 -1582 private void createMyEphemeralNode() throws KeeperException, IOException { -1583 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); -1584 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); -1585 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); -1586 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); -1587 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); -1588 } -1589 -1590 private void deleteMyEphemeralNode() throws KeeperException { -1591 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); -1592 } -1593 -1594 @Override -1595 public RegionServerAccounting getRegionServerAccounting() { -1596 return regionServerAccounting; -1597 } -1598 -1599 /* -1600 * @param r Region to get RegionLoad for. -1601 * @param regionLoadBldr the RegionLoad.Builder, can be null -1602 * @param regionSpecifier the RegionSpecifier.Builder, can be null -1603 * @return RegionLoad instance. -1604 * -1605 * @throws IOException -1606 */ -1607 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, -1608 RegionSpecifier.Builder regionSpecifier) throws IOException { -1609 byte[] name = r.getRegionInfo().getRegionName(); -1610 int stores = 0; -1611 int storefiles = 0; -1612 int storeUncompressedSizeMB = 0; -1613 int storefileSizeMB = 0; -1614 int memstoreSizeMB = (int) (r.getMemStoreSize() / 1024 / 1024); -1615 long storefileIndexSizeKB = 0; -1616 int rootIndexSizeKB = 0; -1617 int totalStaticIndexSizeKB = 0; -1618 int totalStaticBloomSizeKB = 0; -1619 long totalCompactingKVs = 0; -1620 long currentCompactedKVs = 0; -1621 List<HStore> storeList = r.getStores(); -1622 stores += storeList.size(); -1623 for (HStore store : storeList) { -1624 storefiles += store.getStorefilesCount(); -1625 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); -1626 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); -1627 storefileIndexSizeKB += store.getStorefilesIndexSize() / 1024; -1628 CompactionProgress progress = store.getCompactionProgress(); -1629 if (progress != null) { -1630 totalCompactingKVs += progress.totalCompactingKVs; -1631 currentCompactedKVs += progress.currentCompactedKVs; -1632 } -1633 rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024); -1634 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024); -1635 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024); -1636 } -1637 -1638 float dataLocality = -1639 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname()); -1640 if (regionLoadBldr == null) { -1641 regionLoadBldr = RegionLoad.newBuilder(); -1642 } -1643 if (regionSpecifier == null) { -1644 regionSpecifier = RegionSpecifier.newBuilder(); -1645 } -1646 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); -1647 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); -1648 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()) -1649 .setStores(stores) -1650 .setStorefiles(storefiles) -1651 .setStoreUncompressedSizeMB(storeUncompressedSizeMB) -1652 .setStorefileSizeMB(storefileSizeMB) -1653 .setMemStoreSizeMB(memstoreSizeMB) -1654 .setStorefileIndexSizeKB(storefileIndexSizeKB) -1655 .setRootIndexSizeKB(rootIndexSizeKB) -1656 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) -1657 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) -1658 .setReadRequestsCount(r.getReadRequestsCount()) -1659 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) -1660 .setWriteRequestsCount(r.getWriteRequestsCount()) -1661 .setTotalCompactingKVs(totalCompactingKVs) -1662 .setCurrentCompactedKVs(currentCompactedKVs) -1663 .setDataLocality(dataLocality) -1664 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); -1665 ((HRegion)r).setCompleteSequenceId(regionLoadBldr); -1666 -1667 return regionLoadBldr.build(); -1668 } -1669 -1670 /** -1671 * @param encodedRegionName -1672 * @return An instance of RegionLoad. -1673 */ -1674 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { -1675 HRegion r = onlineRegions.get(encodedRegionName); -1676 return r != null ? createRegionLoad(r, null, null) : null; -1677 } -1678 -1679 /* -1680 * Inner class that runs on a long period checking if regions need compaction. -1681 */ -1682 private static class CompactionChecker extends ScheduledChore { -1683 private final HRegionServer instance; -1684 private final int majorCompactPriority; -1685 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; -1686 //Iteration is 1-based rather than 0-based so we don't check for compaction -1687 // immediately upon region server startup -1688 private long iteration = 1; -1689 -1690 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { -1691 super("CompactionChecker", stopper, sleepTime); -1692 this.instance = h; -1693 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); -1694 -1695 /* MajorCompactPriority is configurable. -1696 * If not set, the compaction will use default priority. -1697 */ -1698 this.majorCompactPriority = this.instance.conf. -1699 getInt("hbase.regionserver.compactionChecker.majorCompactPriority", -1700 DEFAULT_PRIORITY); -1701 } -1702 -1703 @Override -1704 protected void chore() { -1705 for (Region r : this.instance.onlineRegions.values()) { -1706 if (r == null) { -1707 continue; -1708 } -1709 HRegion hr = (HRegion) r; -1710 for (HStore s : hr.stores.values()) { -1711 try { -1712 long multiplier = s.getCompactionCheckMultiplier(); -1713 assert multiplier > 0; -1714 if (iteration % multiplier != 0) { -1715 continue; -1716 } -1717 if (s.needsCompaction()) { -1718 // Queue a compaction. Will recognize if major is needed. -1719 this.instance.compactSplitThread.requestSystemCompaction(hr, s, -1720 getName() + " requests compaction"); -1721 } else if (s.shouldPerformMajorCompaction()) { -1722 s.triggerMajorCompaction(); -1723 if (majorCompactPriority == DEFAULT_PRIORITY || -1724 majorCompactPriority > hr.getCompactPriority()) { -1725 this.instance.compactSplitThread.requestCompaction(hr, s, -1726 getName() + " requests major compaction; use default priority", -1727 Store.NO_PRIORITY, -1728 CompactionLifeCycleTracker.DUMMY, null); -1729 } else { -1730 this.instance.compactSplitThread.requestCompaction(hr, s, -1731 getName() + " requests major compaction; use configured priority", -1732 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); -1733 } -1734 } -1735 } catch (IOException e) { -1736 LOG.warn("Failed major compaction check on " + r, e); -1737 } -1738 } -1739 } -1740 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); -1741 } -1742 } -1743 -1744 static class PeriodicMemStoreFlusher extends ScheduledChore { -1745 final HRegionServer server; -1746 final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds -1747 final static int MIN_DELAY_TIME = 0; // millisec -1748 public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { -1749 super("MemstoreFlusherChore", server, cacheFlushInterval); -1750 this.server = server; -1751 } -1752 -1753 @Override -1754 protected void chore() { -1755 final StringBuffer whyFlush = new StringBuffer(); -1756 for (HRegion r : this.server.onlineRegions.values()) { -1757 if (r == null) continue; -1758 if (r.shouldFlush(whyFlush)) { -1759 FlushRequester requester = server.getFlushRequester(); -1760 if (requester != null) { -1761 long randomDelay = RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME; -1762 LOG.info(getName() + " requesting flush of " + -1763 r.getRegionInfo().getRegionNameAsString() + " because " + -1764 whyFlush.toString() + -1765 " after random delay " + randomDelay + "ms"); -1766 //Throttle the flushes by putting a delay. If we don't throttle, and there -1767 //is a balanced write-load on the regions in a table, we might end up -1768 //overwhelming the filesystem with too many flushes at once. -1769 requester.requestDelayedFlush(r, randomDelay, false); -1770 } -1771 } -1772 } -1773 } -1774 } -1775 -1776 /** -1777 * Report the status of the server. A server is online once all the startup is -1778 * completed (setting up filesystem, starting executorService threads, etc.). This -1779 * method is designed mostly to be useful in tests. -1780 * -1781 * @return true if online, false if not. -1782 */ -1783 public boolean isOnline() { -1784 return online.get(); -1785 } -1786 -1787 /** -1788 * Setup WAL log and replication if enabled. -1789 * Replication setup is done in here because it wants to be hooked up to WAL. -1790 * -1791 * @throws IOException -1792 */ -1793 private void setupWALAndReplication() throws IOException { -1794 // TODO Replication make assumptions here based on the default filesystem impl -1795 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); -1796 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); -1797 -1798 Path logDir = new Path(walRootDir, logName); -1799 if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); -1800 if (this.walFs.exists(logDir)) { -1801 throw new RegionServerRunningException("Region server has already " + -1802 "created directory at " + this.serverName.toString()); -1803 } -1804 -1805 // Instantiate replication if replication enabled. Pass it the log directories. -1806 // In here we create the Replication instances. Later they are initialized and started up. -1807 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir); -1808 -1809 // listeners the wal factory will add to wals it creates. -1810 List<WALActionsListener> listeners = new ArrayList<>(); -1811 listeners.add(new MetricsWAL()); -1812 if (this.replicationSourceHandler != null && -1813 this.replicationSourceHandler.getWALActionsListener() != null) { -1814 // Replication handler is an implementation of WALActionsListener. -1815 listeners.add(this.replicationSourceHandler.getWALActionsListener()); -1816 } -1817 -1818 // There is a cyclic dependency between ReplicationSourceHandler and WALFactory. -1819 // We use WALActionsListener to get the newly rolled WALs, so we need to get the -1820 // WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then -1821 // ReplicationSourceHandler need to use WALFactory get the length of the wal file being written. -1822 // So we here we need to construct WALFactory first, and then pass it to the initialized method -1823 // of ReplicationSourceHandler. -1824 // TODO: I can't follow replication; it has initialize and then later on we start it! -1825 WALFactory factory = new WALFactory(conf, listeners, serverName.toString()); -1826 this.walFactory = factory; -1827 if (this.replicationSourceHandler != null) { -1828 this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory); -1829 } -1830 if (this.replicationSinkHandler != null && -1831 this.replicationSinkHandler != this.replicationSourceHandler) { -1832 this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory); -1833 } -1834 } -1835 -1836 /** -1837 * Start up replication source and sink handlers. -1838 * @throws IOException -1839 */ -1840 private void startReplicationService() throws IOException { -1841 if (this.replicationSourceHandler == this.replicationSinkHandler && -1842 this.replicationSourceHandler != null) { -1843 this.replicationSourceHandler.startReplicationService(); -1844 } else { -1845 if (this.replicationSourceHandler != null) { -1846 this.replicationSourceHandler.startReplicationService(); -1847 } -1848 if (this.replicationSinkHandler != null) { -1849 this.replicationSinkHandler.startReplicationService(); -1850 } -1851 } -1852 } -1853 +1534 +1535 // Set up ZK +1536 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + +1537 ", sessionid=0x" + +1538 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); +1539 +1540 // Wake up anyone waiting for this server to online +1541 synchronized (online) { +1542 online.set(true); +1543 online.notifyAll(); +1544 } +1545 } catch (Throwable e) { +1546 stop("Failed initialization"); +1547 throw convertThrowableToIOE(cleanup(e, "Failed init"), +1548 "Region server startup failed"); +1549 } finally { +1550 sleeper.skipSleepCycle(); +1551 } +1552 } +1553 +1554 protected void initializeMemStoreChunkCreator() { +1555 if (MemStoreLAB.isEnabled(conf)) { +1556 // MSLAB is enabled. So initialize MemStoreChunkPool +1557 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from +1558 // it. +1559 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); +1560 long globalMemStoreSize = pair.getFirst(); +1561 boolean offheap = this.regionServerAccounting.isOffheap(); +1562 // When off heap memstore in use, take full area for chunk pool. +1563 float poolSizePercentage = offheap? 1.0F: +1564 conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); +1565 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, +1566 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); +1567 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); +1568 // init the chunkCreator +1569 ChunkCreator chunkCreator = +1570 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, +1571 initialCountPercentage, this.hMemManager); +1572 } +1573 } +1574 +1575 private void startHeapMemoryManager() { +1576 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this, +1577 this.regionServerAccounting); +1578 if (this.hMemManager != null) { +1579 this.hMemManager.start(getChoreService()); +1580 } +1581 } +1582 +1583 private void createMyEphemeralNode() throws KeeperException, IOException { +1584 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); +1585 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); +1586 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); +1587 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); +1588 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); +1589 } +1590 +1591 private void deleteMyEphemeralNode() throws KeeperException { +1592 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); +1593 } +1594 +1595 @Override +1596 public RegionServerAccounting getRegionServerAccounting() { +1597 return regionServerAccounting; +1598 } +1599 +1600 /* +1601 * @param r Region to get RegionLoad for. +1602 * @param regionLoadBldr the RegionLoad.Builder, can be null +1603 * @param regionSpecifier the RegionSpecifier.Builder, can be null +1604 * @return RegionLoad instance. +1605 * +1606 * @throws IOException +1607 */ +1608 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, +1609 RegionSpecifier.Builder regionSpecifier) throws IOException { +1610 byte[] name = r.getRegionInfo().getRegionName(); +1611 int stores = 0; +1612 int storefiles = 0; +1613 int storeUncompressedSizeMB = 0; +1614 int storefileSizeMB = 0; +1615 int memstoreSizeMB = (int) (r.getMemStoreSize() / 1024 / 1024); +1616 long storefileIndexSizeKB = 0; +1617 int rootIndexSizeKB = 0; +1618 int totalStaticIndexSizeKB = 0; +1619 int totalStaticBloomSizeKB = 0; +1620 long totalCompactingKVs = 0; +1621 long currentCompactedKVs = 0; +1622 List<HStore> storeList = r.getStores(); +1623 stores += storeList.size(); +1624 for (HStore store : storeList) { +1625 storefiles += store.getStorefilesCount(); +1626 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); +1627 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); +1628 storefileIndexSizeKB += store.getStorefilesIndexSize() / 1024; +1629 CompactionProgress progress = store.getCompactionProgress(); +1630 if (progress != null) { +1631 totalCompactingKVs += progress.totalCompactingKVs; +1632 currentCompactedKVs += progress.currentCompactedKVs; +1633 } +1634 rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024); +1635 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024); +1636 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024); +1637 } +1638 +1639 float dataLocality = +1640 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname()); +1641 if (regionLoadBldr == null) { +1642 regionLoadBldr = RegionLoad.newBuilder(); +1643 } +1644 if (regionSpecifier == null) { +1645 regionSpecifier = RegionSpecifier.newBuilder(); +1646 } +1647 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); +1648 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); +1649 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()) +1650 .setStores(stores) +1651 .setStorefiles(storefiles) +1652 .setStoreUncompressedSizeMB(storeUncompressedSizeMB) +1653 .setStorefileSizeMB(storefileSizeMB) +1654 .setMemStoreSizeMB(memstoreSizeMB) +1655 .setStorefileIndexSizeKB(storefileIndexSizeKB) +1656 .setRootIndexSizeKB(rootIndexSizeKB) +1657 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) +1658 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) +1659 .setReadRequestsCount(r.getReadRequestsCount()) +1660 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) +1661 .setWriteRequestsCount(r.getWriteRequestsCount()) +1662 .setTotalCompactingKVs(totalCompactingKVs) +1663 .setCurrentCompactedKVs(currentCompactedKVs) +1664 .setDataLocality(dataLocality) +1665 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); +1666 ((HRegion)r).setCompleteSequenceId(regionLoadBldr); +1667 +1668 return regionLoadBldr.build(); +1669 } +1670 +1671 /** +1672 * @param encodedRegionName +1673 * @return An instance of RegionLoad. +1674 */ +1675 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { +1676 HRegion r = onlineRegions.get(encodedRegionName); +1677 return r != null ? createRegionLoad(r, null, null) : null; +1678 } +1679 +1680 /* +1681 * Inner class that runs on a long period checking if regions need compaction. +1682 */ +1683 private static class CompactionChecker extends ScheduledChore { +1684 private final HRegionServer instance; +1685 private final int majorCompactPriority; +1686 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; +1687 //Iteration is 1-based rather than 0-based so we don't check for compaction +1688 // immediately upon region server startup +1689 private long iteration = 1; +1690 +1691 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { +1692 super("CompactionChecker", stopper, sleepTime); +1693 this.instance = h; +1694 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); +1695 +1696 /* MajorCompactPriority is configurable. +1697 * If not set, the compaction will use default priority. +1698 */ +1699 this.majorCompactPriority = this.instance.conf. +1700 getInt("hbase.regionserver.compactionChecker.majorCompactPriority", +1701 DEFAULT_PRIORITY); +1702 } +1703 +1704 @Override +1705 protected void chore() { +1706 for (Region r : this.instance.onlineRegions.values()) { +1707 if (r == null) { +1708 continue; +1709 } +1710 HRegion hr = (HRegion) r; +1711 for (HStore s : hr.stores.values()) { +1712 try { +1713 long multiplier = s.getCompactionCheckMultiplier(); +1714 assert multiplier > 0; +1715 if (iteration % multiplier != 0) { +1716 continue; +1717 } +1718 if (s.needsCompaction()) { +1719 // Queue a compaction. Will recognize if major is needed. +1720 this.instance.compactSplitThread.requestSystemCompaction(hr, s, +1721 getName() + " requests compaction"); +1722 } else if (s.shouldPerformMajorCompaction()) { +1723 s.triggerMajorCompaction(); +1724 if (majorCompactPriority == DEFAULT_PRIORITY || +1725 majorCompactPriority > hr.getCompactPriority()) { +1726 this.instance.compactSplitThread.requestCompaction(hr, s, +1727 getName() + " requests major compaction; use default priority", +1728 Store.NO_PRIORITY, +1729 CompactionLifeCycleTracker.DUMMY, null); +1730 } else { +1731 this.instance.compactSplitThread.requestCompaction(hr, s, +1732 getName() + " requests major compaction; use configured priority", +1733 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); +1734 } +1735 } +1736 } catch (IOException e) { +1737 LOG.warn("Failed major compaction check on " + r, e); +1738 } +1739 } +1740 } +1741 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); +1742 } +1743 } +1744 +1745 static class PeriodicMemStoreFlusher extends ScheduledChore { +1746 final HRegionServer server; +1747 final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds +1748 final static int MIN_DELAY_TIME = 0; // millisec +1749 public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { +1750 super("MemstoreFlusherChore", server, cacheFlushInterval); +1751 this.server = server; +1752 } +1753 +1754 @Override +1755 protected void chore() { +1756 final StringBuffer whyFlush = new StringBuffer(); +1757 for (HRegion r : this.server.onlineRegions.values()) { +1758 if (r == null) continue; +1759 if (r.shouldFlush(whyFlush)) { +1760 FlushRequester requester = server.getFlushRequester(); +1761 if (requester != null) { +1762 long randomDelay = RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME; +1763 LOG.info(getName() + " requesting flush of " + +1764 r.getRegionInfo().getRegionNameAsString() + " because " + +1765 whyFlush.toString() + +1766 " after random delay " + randomDelay + "ms"); +1767 //Throttle the flushes by putting a delay. If we don't throttle, and there +1768 //is a balanced write-load on the regions in a table, we might end up +1769 //overwhelming the filesystem with too many flushes at once. +1770 requester.requestDelayedFlush(r, randomDelay, false); +1771 } +1772 } +1773 } +1774 } +1775 } +1776 +1777 /** +1778 * Report the status of the server. A server is online once all the startup is +1779 * completed (setting up filesystem, starting executorService threads, etc.). This +1780 * method is designed mostly to be useful in tests. +1781 * +1782 * @return true if online, false if not. +1783 */ +1784 public boolean isOnline() { +1785 return online.get(); +1786 } +1787 +1788 /** +1789 * Setup WAL log and replication if enabled. +1790 * Replication setup is done in here because it wants to be hooked up to WAL. +1791 * +1792 * @throws IOException +1793 */ +1794 private void setupWALAndReplication() throws IOException { +1795 // TODO Replication make assumptions here based on the default filesystem impl +1796 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); +1797 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); +1798 +1799 Path logDir = new Path(walRootDir, logName); +1800 if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); +1801 if (this.walFs.exists(logDir)) { +1802 throw new RegionServerRunningException("Region server has already " + +1803 "created directory at " + this.serverName.toString()); +1804 } +1805 +1806 // Instantiate replication if replication enabled. Pass it the log directories. +1807 // In here we create the Replication instances. Later they are initialized and started up. +1808 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir); +1809 +1810 // listeners the wal factory will add to wals it creates. +1811 List<WALActionsListener> listeners = new ArrayList<>(); +1812 listeners.add(new MetricsWAL()); +1813 if (this.replicationSourceHandler != null && +1814 this.replicationSourceHandler.getWALActionsListener() != null) { +1815 // Replication handler is an implementation of WALActionsListener. +1816 listeners.add(this.replicationSourceHandler.getWALActionsListener()); +1817 } +1818 +1819 // There is a cyclic dependency between ReplicationSourceHandler and WALFactory. +1820 // We use WALActionsListener to get the newly rolled WALs, so we need to get the +1821 // WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then +1822 // ReplicationSourceHandler need to use WALFactory get the length of the wal file being written. +1823 // So we here we need to construct WALFactory first, and then pass it to the initialized method +1824 // of ReplicationSourceHandler. +1825 // TODO: I can't follow replication; it has initialize and then later on we start it! +1826 WALFactory factory = new WALFactory(conf, listeners, serverName.toString()); +1827 this.walFactory = factory; +1828 if (this.replicationSourceHandler != null) { +1829 this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory); +1830 } +1831 if (this.replicationSinkHandler != null && +1832 this.replicationSinkHandler != this.replicationSourceHandler) { +1833 this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory); +1834 } +1835 } +1836 +1837 /** +1838 * Start up replication source and sink handlers. +1839 * @throws IOException +1840 */ +1841 private void startReplicationService() throws IOException { +1842 if (this.replicationSourceHandler == this.replicationSinkHandler && +1843 this.replicationSourceHandler != null) { +1844 this.replicationSourceHandler.startReplicationService(); +1845 } else { +1846 if (this.replicationSourceHandler != null) { +1847 this.replicationSourceHandler.startReplicationService(); +1848 } +1849 if (this.replicationSinkHandler != null) { +1850 this.replicationSinkHandler.startReplicationService(); +1851 } +1852 } +1853 } 1854 -1855 public MetricsRegionServer getRegionServerMetrics() { -1856 return this.metricsRegionServer; -1857 } -1858 -1859 /** -1860 * @return Master address tracker instance. -1861 */ -1862 public MasterAddressTracker getMasterAddressTracker() { -1863 return this.masterAddressTracker; -1864 } -1865 -1866 /* -1867 * Start maintenance Threads, Server, Worker and lease checker threads. -1868 * Start all threads we need to run. This is called after we've successfully -1869 * registered with the Master. -1870 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we -1871 * get an unhandled exception. We cannot set the handler on all threads. -1872 * Server's internal Listener thread is off limits. For Server, if an OOME, it -1873 * waits a while then retries. Meantime, a flush or a compaction that tries to -1874 * run should trigger same critical condition and the shutdown will run. On -1875 * its way out, this server will shut down Server. Leases are sort of -1876 * inbetween. It has an internal thread that while it inherits from Chore, it -1877 * keeps its own internal stop mechanism so needs to be stopped by this -1878 * hosting server. Worker logs the exception and exits. -1879 */ -1880 private void startServices() throws IOException { -1881 if (!isStopped() && !isAborted()) { -1882 initializeThreads(); -1883 } -1884 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); -1885 this.secureBulkLoadManager.start(); -1886 -1887 // Health checker thread. -1888 if (isHealthCheckerConfigured()) { -1889 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, -1890 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); -1891 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); -1892 } -1893 -1894 this.walRoller = new LogRoller(this, this); -1895 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); -1896 -1897 // Create the CompactedFileDischarger chore executorService. This chore helps to -1898 // remove the compacted files -1899 // that will no longer be used in reads. -1900 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to -1901 // 2 mins so that compacted files can be archived before the TTLCleaner runs -1902 int cleanerInterval = -1903 conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); -1904 this.compactedFileDischarger = -1905 new CompactedHFilesDischarger(cleanerInterval, this, this); -1906 choreService.scheduleChore(compactedFileDischarger); -1907 -1908 // Start executor services -1909 this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, -1910 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); -1911 this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, -1912 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); -1913 this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, -1914 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); -1915 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, -1916 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); -1917 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, -1918 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); -1919 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { -1920 this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, -1921 conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); -1922 } -1923 this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( -1924 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); -1925 // Start the threads for compacted files discharger -1926 this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, -1927 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); -1928 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { -1929 this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, -1930 conf.getInt("hbase.regionserver.region.replica.flusher.threads", -1931 conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); -1932 } -1933 -1934 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", -1935 uncaughtExceptionHandler); -1936 this.cacheFlusher.start(uncaughtExceptionHandler); -1937 -1938 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); -1939 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); -1940 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); -1941 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); -1942 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); -1943 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); -1944 if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore); -1945 -1946 // Leases is not a Thread. Internally it runs a daemon thread. If it gets -1947 // an unhandled exception, it will just exit. -1948 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", -1949 uncaughtExceptionHandler); -1950 -1951 // Create the log splitting worker and start it -1952 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for -1953 // quite a while inside Connection layer. The worker won't be available for other -1954 // tasks even after current task is preempted after a split task times out. -1955 Configuration sinkConf = HBaseConfiguration.create(conf); -1956 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, -1957 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds -1958 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, -1959 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds -1960 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); -1961 if (this.csm != null) { -1962 // SplitLogWorker needs csm. If none, don't start this. -1963 this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, -1964 this, walFactory); -1965 splitLogWorker.start(); -1966 } else { -1967 LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null"); -1968 } -1969 -1970 // Memstore services. -1971 startHeapMemoryManager(); -1972 // Call it after starting HeapMemoryManager. -1973 initializeMemStoreChunkCreator(); -1974 } -1975 -1976 private void initializeThreads() throws IOException { -1977 // Cache flushing thread. -1978 this.cacheFlusher = new MemStoreFlusher(conf, this); -1979 -1980 // Compaction thread -1981 this.compactSplitThread = new CompactSplit(this); -1982 -1983 // Background thread to check for compactions; needed if region has not gotten updates -1984 // in a while. It will take care of not checking too frequently on store-by-store basis. -1985 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); -1986 this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this); -1987 this.leases = new Leases(this.threadWakeFrequency); -1988 -1989 // Create the thread to clean the moved regions list -1990 movedRegionsCleaner = MovedRegionsCleaner.create(this); -1991 -1992 if (this.nonceManager != null) { -1993 // Create the scheduled chore that cleans up nonces. -1994 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); -1995 } -1996 -1997 // Setup the Quota Manager -1998 rsQuotaManager = new RegionServerRpcQuotaManager(this); -1999 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); -2000 -2001 if (QuotaUtil.isQuotaEnabled(conf)) { -2002 this.fsUtilizationChore = new FileSystemUtilizationChore(this); -2003 } -2004 +1855 +1856 public MetricsRegionServer getRegionServerMetrics() { +1857 return this.metricsRegionServer; +