From commits-return-65765-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Fri Jan 19 16:31:00 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 7788E180718 for ; Fri, 19 Jan 2018 16:31:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6799A160C4E; Fri, 19 Jan 2018 15:31:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 647AF160C27 for ; Fri, 19 Jan 2018 16:30:58 +0100 (CET) Received: (qmail 23179 invoked by uid 500); 19 Jan 2018 15:30:56 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 22951 invoked by uid 99); 19 Jan 2018 15:30:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jan 2018 15:30:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2B5A9F4DC0; Fri, 19 Jan 2018 15:30:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Fri, 19 Jan 2018 15:31:23 -0000 Message-Id: <386c14f18007499680226e02585eb7fb@git.apache.org> In-Reply-To: <10e595f691db492d91eeeb92178d60e5@git.apache.org> References: <10e595f691db492d91eeeb92178d60e5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [32/51] [partial] hbase-site git commit: Published site at . http://git-wip-us.apache.org/repos/asf/hbase-site/blob/14db89d7/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html index b50a65f..7271567 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html @@ -1718,312 +1718,314 @@ 1710 1711 public WorkerThread(final ThreadGroup group) { 1712 super(group, "ProcExecWrkr-" + workerId.incrementAndGet()); -1713 } -1714 -1715 @Override -1716 public void sendStopSignal() { -1717 scheduler.signalAll(); -1718 } -1719 -1720 @Override -1721 public void run() { -1722 long lastUpdate = EnvironmentEdgeManager.currentTime(); -1723 try { -1724 while (isRunning() && keepAlive(lastUpdate)) { -1725 this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); -1726 if (this.activeProcedure == null) continue; -1727 int activeCount = activeExecutorCount.incrementAndGet(); -1728 int runningCount = store.setRunningProcedureCount(activeCount); -1729 if (LOG.isTraceEnabled()) { -1730 LOG.trace("Execute pid=" + this.activeProcedure.getProcId() + -1731 " runningCount=" + runningCount + ", activeCount=" + activeCount); -1732 } -1733 executionStartTime.set(EnvironmentEdgeManager.currentTime()); -1734 try { -1735 executeProcedure(this.activeProcedure); -1736 } catch (AssertionError e) { -1737 LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e); -1738 throw e; -1739 } finally { -1740 activeCount = activeExecutorCount.decrementAndGet(); -1741 runningCount = store.setRunningProcedureCount(activeCount); -1742 if (LOG.isTraceEnabled()) { -1743 LOG.trace("Halt pid=" + this.activeProcedure.getProcId() + -1744 " runningCount=" + runningCount + ", activeCount=" + activeCount); -1745 } -1746 this.activeProcedure = null; -1747 lastUpdate = EnvironmentEdgeManager.currentTime(); -1748 executionStartTime.set(Long.MAX_VALUE); -1749 } -1750 } -1751 } catch (Throwable t) { -1752 LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t); -1753 } finally { -1754 LOG.debug("Worker terminated."); -1755 } -1756 workerThreads.remove(this); -1757 } -1758 -1759 @Override -1760 public String toString() { -1761 Procedure<?> p = this.activeProcedure; -1762 return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); -1763 } -1764 -1765 /** -1766 * @return the time since the current procedure is running -1767 */ -1768 public long getCurrentRunTime() { -1769 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); -1770 } -1771 -1772 private boolean keepAlive(final long lastUpdate) { -1773 if (workerThreads.size() <= corePoolSize) return true; -1774 return (EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime; -1775 } -1776 } -1777 -1778 /** -1779 * Runs task on a period such as check for stuck workers. -1780 * @see InlineChore -1781 */ -1782 private final class TimeoutExecutorThread extends StoppableThread { -1783 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); -1784 -1785 public TimeoutExecutorThread(final ThreadGroup group) { -1786 super(group, "ProcExecTimeout"); -1787 } -1788 -1789 @Override -1790 public void sendStopSignal() { -1791 queue.add(DelayedUtil.DELAYED_POISON); -1792 } -1793 -1794 @Override -1795 public void run() { -1796 final boolean traceEnabled = LOG.isTraceEnabled(); -1797 while (isRunning()) { -1798 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); -1799 if (task == null || task == DelayedUtil.DELAYED_POISON) { -1800 // the executor may be shutting down, -1801 // and the task is just the shutdown request -1802 continue; -1803 } -1804 -1805 if (traceEnabled) { -1806 LOG.trace("Executing " + task); -1807 } -1808 -1809 // execute the task -1810 if (task instanceof InlineChore) { -1811 execInlineChore((InlineChore)task); -1812 } else if (task instanceof DelayedProcedure) { -1813 execDelayedProcedure((DelayedProcedure)task); -1814 } else { -1815 LOG.error("CODE-BUG unknown timeout task type " + task); -1816 } -1817 } -1818 } -1819 -1820 public void add(final InlineChore chore) { -1821 chore.refreshTimeout(); -1822 queue.add(chore); -1823 } -1824 -1825 public void add(final Procedure procedure) { -1826 assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; -1827 LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() + -1828 ", timestamp=" + procedure.getTimeoutTimestamp()); -1829 queue.add(new DelayedProcedure(procedure)); -1830 } -1831 -1832 public boolean remove(final Procedure procedure) { -1833 return queue.remove(new DelayedProcedure(procedure)); -1834 } -1835 -1836 private void execInlineChore(final InlineChore chore) { -1837 chore.run(); -1838 add(chore); -1839 } -1840 -1841 private void execDelayedProcedure(final DelayedProcedure delayed) { -1842 // TODO: treat this as a normal procedure, add it to the scheduler and -1843 // let one of the workers handle it. -1844 // Today we consider ProcedureInMemoryChore as InlineChores -1845 final Procedure procedure = delayed.getObject(); -1846 if (procedure instanceof ProcedureInMemoryChore) { -1847 executeInMemoryChore((ProcedureInMemoryChore)procedure); -1848 // if the procedure is in a waiting state again, put it back in the queue -1849 procedure.updateTimestamp(); -1850 if (procedure.isWaiting()) { -1851 delayed.setTimeout(procedure.getTimeoutTimestamp()); -1852 queue.add(delayed); -1853 } -1854 } else { -1855 executeTimedoutProcedure(procedure); -1856 } -1857 } -1858 -1859 private void executeInMemoryChore(final ProcedureInMemoryChore chore) { -1860 if (!chore.isWaiting()) return; -1861 -1862 // The ProcedureInMemoryChore is a special case, and it acts as a chore. -1863 // instead of bringing the Chore class in, we reuse this timeout thread for -1864 // this special case. -1865 try { -1866 chore.periodicExecute(getEnvironment()); -1867 } catch (Throwable e) { -1868 LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e); -1869 } -1870 } -1871 -1872 private void executeTimedoutProcedure(final Procedure proc) { -1873 // The procedure received a timeout. if the procedure itself does not handle it, -1874 // call abort() and add the procedure back in the queue for rollback. -1875 if (proc.setTimeoutFailure(getEnvironment())) { -1876 long rootProcId = Procedure.getRootProcedureId(procedures, proc); -1877 RootProcedureState procStack = rollbackStack.get(rootProcId); -1878 procStack.abort(); -1879 store.update(proc); -1880 scheduler.addFront(proc); -1881 } -1882 } -1883 } -1884 -1885 private static final class DelayedProcedure -1886 extends DelayedUtil.DelayedContainerWithTimestamp<Procedure> { -1887 public DelayedProcedure(final Procedure procedure) { -1888 super(procedure, procedure.getTimeoutTimestamp()); -1889 } -1890 } -1891 -1892 private static abstract class StoppableThread extends Thread { -1893 public StoppableThread(final ThreadGroup group, final String name) { -1894 super(group, name); -1895 } -1896 -1897 public abstract void sendStopSignal(); +1713 setDaemon(true); +1714 } +1715 +1716 @Override +1717 public void sendStopSignal() { +1718 scheduler.signalAll(); +1719 } +1720 +1721 @Override +1722 public void run() { +1723 long lastUpdate = EnvironmentEdgeManager.currentTime(); +1724 try { +1725 while (isRunning() && keepAlive(lastUpdate)) { +1726 this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); +1727 if (this.activeProcedure == null) continue; +1728 int activeCount = activeExecutorCount.incrementAndGet(); +1729 int runningCount = store.setRunningProcedureCount(activeCount); +1730 if (LOG.isTraceEnabled()) { +1731 LOG.trace("Execute pid=" + this.activeProcedure.getProcId() + +1732 " runningCount=" + runningCount + ", activeCount=" + activeCount); +1733 } +1734 executionStartTime.set(EnvironmentEdgeManager.currentTime()); +1735 try { +1736 executeProcedure(this.activeProcedure); +1737 } catch (AssertionError e) { +1738 LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e); +1739 throw e; +1740 } finally { +1741 activeCount = activeExecutorCount.decrementAndGet(); +1742 runningCount = store.setRunningProcedureCount(activeCount); +1743 if (LOG.isTraceEnabled()) { +1744 LOG.trace("Halt pid=" + this.activeProcedure.getProcId() + +1745 " runningCount=" + runningCount + ", activeCount=" + activeCount); +1746 } +1747 this.activeProcedure = null; +1748 lastUpdate = EnvironmentEdgeManager.currentTime(); +1749 executionStartTime.set(Long.MAX_VALUE); +1750 } +1751 } +1752 } catch (Throwable t) { +1753 LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t); +1754 } finally { +1755 LOG.debug("Worker terminated."); +1756 } +1757 workerThreads.remove(this); +1758 } +1759 +1760 @Override +1761 public String toString() { +1762 Procedure<?> p = this.activeProcedure; +1763 return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); +1764 } +1765 +1766 /** +1767 * @return the time since the current procedure is running +1768 */ +1769 public long getCurrentRunTime() { +1770 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); +1771 } +1772 +1773 private boolean keepAlive(final long lastUpdate) { +1774 if (workerThreads.size() <= corePoolSize) return true; +1775 return (EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime; +1776 } +1777 } +1778 +1779 /** +1780 * Runs task on a period such as check for stuck workers. +1781 * @see InlineChore +1782 */ +1783 private final class TimeoutExecutorThread extends StoppableThread { +1784 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); +1785 +1786 public TimeoutExecutorThread(final ThreadGroup group) { +1787 super(group, "ProcExecTimeout"); +1788 setDaemon(true); +1789 } +1790 +1791 @Override +1792 public void sendStopSignal() { +1793 queue.add(DelayedUtil.DELAYED_POISON); +1794 } +1795 +1796 @Override +1797 public void run() { +1798 final boolean traceEnabled = LOG.isTraceEnabled(); +1799 while (isRunning()) { +1800 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); +1801 if (task == null || task == DelayedUtil.DELAYED_POISON) { +1802 // the executor may be shutting down, +1803 // and the task is just the shutdown request +1804 continue; +1805 } +1806 +1807 if (traceEnabled) { +1808 LOG.trace("Executing " + task); +1809 } +1810 +1811 // execute the task +1812 if (task instanceof InlineChore) { +1813 execInlineChore((InlineChore)task); +1814 } else if (task instanceof DelayedProcedure) { +1815 execDelayedProcedure((DelayedProcedure)task); +1816 } else { +1817 LOG.error("CODE-BUG unknown timeout task type " + task); +1818 } +1819 } +1820 } +1821 +1822 public void add(final InlineChore chore) { +1823 chore.refreshTimeout(); +1824 queue.add(chore); +1825 } +1826 +1827 public void add(final Procedure procedure) { +1828 assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; +1829 LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() + +1830 ", timestamp=" + procedure.getTimeoutTimestamp()); +1831 queue.add(new DelayedProcedure(procedure)); +1832 } +1833 +1834 public boolean remove(final Procedure procedure) { +1835 return queue.remove(new DelayedProcedure(procedure)); +1836 } +1837 +1838 private void execInlineChore(final InlineChore chore) { +1839 chore.run(); +1840 add(chore); +1841 } +1842 +1843 private void execDelayedProcedure(final DelayedProcedure delayed) { +1844 // TODO: treat this as a normal procedure, add it to the scheduler and +1845 // let one of the workers handle it. +1846 // Today we consider ProcedureInMemoryChore as InlineChores +1847 final Procedure procedure = delayed.getObject(); +1848 if (procedure instanceof ProcedureInMemoryChore) { +1849 executeInMemoryChore((ProcedureInMemoryChore)procedure); +1850 // if the procedure is in a waiting state again, put it back in the queue +1851 procedure.updateTimestamp(); +1852 if (procedure.isWaiting()) { +1853 delayed.setTimeout(procedure.getTimeoutTimestamp()); +1854 queue.add(delayed); +1855 } +1856 } else { +1857 executeTimedoutProcedure(procedure); +1858 } +1859 } +1860 +1861 private void executeInMemoryChore(final ProcedureInMemoryChore chore) { +1862 if (!chore.isWaiting()) return; +1863 +1864 // The ProcedureInMemoryChore is a special case, and it acts as a chore. +1865 // instead of bringing the Chore class in, we reuse this timeout thread for +1866 // this special case. +1867 try { +1868 chore.periodicExecute(getEnvironment()); +1869 } catch (Throwable e) { +1870 LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e); +1871 } +1872 } +1873 +1874 private void executeTimedoutProcedure(final Procedure proc) { +1875 // The procedure received a timeout. if the procedure itself does not handle it, +1876 // call abort() and add the procedure back in the queue for rollback. +1877 if (proc.setTimeoutFailure(getEnvironment())) { +1878 long rootProcId = Procedure.getRootProcedureId(procedures, proc); +1879 RootProcedureState procStack = rollbackStack.get(rootProcId); +1880 procStack.abort(); +1881 store.update(proc); +1882 scheduler.addFront(proc); +1883 } +1884 } +1885 } +1886 +1887 private static final class DelayedProcedure +1888 extends DelayedUtil.DelayedContainerWithTimestamp<Procedure> { +1889 public DelayedProcedure(final Procedure procedure) { +1890 super(procedure, procedure.getTimeoutTimestamp()); +1891 } +1892 } +1893 +1894 private static abstract class StoppableThread extends Thread { +1895 public StoppableThread(final ThreadGroup group, final String name) { +1896 super(group, name); +1897 } 1898 -1899 public void awaitTermination() { -1900 try { -1901 final long startTime = EnvironmentEdgeManager.currentTime(); -1902 for (int i = 0; isAlive(); ++i) { -1903 sendStopSignal(); -1904 join(250); -1905 if (i > 0 && (i % 8) == 0) { -1906 LOG.warn("Waiting termination of thread " + getName() + ", " + -1907 StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); -1908 } -1909 } -1910 } catch (InterruptedException e) { -1911 LOG.warn(getName() + " join wait got interrupted", e); -1912 } -1913 } -1914 } -1915 -1916 // ========================================================================== -1917 // Inline Chores (executors internal chores) +1899 public abstract void sendStopSignal(); +1900 +1901 public void awaitTermination() { +1902 try { +1903 final long startTime = EnvironmentEdgeManager.currentTime(); +1904 for (int i = 0; isAlive(); ++i) { +1905 sendStopSignal(); +1906 join(250); +1907 if (i > 0 && (i % 8) == 0) { +1908 LOG.warn("Waiting termination of thread " + getName() + ", " + +1909 StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); +1910 } +1911 } +1912 } catch (InterruptedException e) { +1913 LOG.warn(getName() + " join wait got interrupted", e); +1914 } +1915 } +1916 } +1917 1918 // ========================================================================== -1919 private static abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable { -1920 private long timeout; -1921 -1922 public abstract int getTimeoutInterval(); +1919 // Inline Chores (executors internal chores) +1920 // ========================================================================== +1921 private static abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable { +1922 private long timeout; 1923 -1924 protected void refreshTimeout() { -1925 this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval(); -1926 } -1927 -1928 @Override -1929 public long getTimeout() { -1930 return timeout; -1931 } -1932 } -1933 -1934 // ---------------------------------------------------------------------------- -1935 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the -1936 // full set of procedures pending and completed to write a compacted -1937 // version of the log (in case is a log)? -1938 // In theory no, procedures are have a short life, so at some point the store -1939 // will have the tracker saying everything is in the last log. -1940 // ---------------------------------------------------------------------------- -1941 -1942 private final class WorkerMonitor extends InlineChore { -1943 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = -1944 "hbase.procedure.worker.monitor.interval.msec"; -1945 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec -1946 -1947 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = -1948 "hbase.procedure.worker.stuck.threshold.msec"; -1949 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec -1950 -1951 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = -1952 "hbase.procedure.worker.add.stuck.percentage"; -1953 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck -1954 -1955 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; -1956 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; -1957 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; -1958 -1959 public WorkerMonitor() { -1960 refreshConfig(); -1961 } -1962 -1963 @Override -1964 public void run() { -1965 final int stuckCount = checkForStuckWorkers(); -1966 checkThreadCount(stuckCount); -1967 -1968 // refresh interval (poor man dynamic conf update) -1969 refreshConfig(); -1970 } -1971 -1972 private int checkForStuckWorkers() { -1973 // check if any of the worker is stuck -1974 int stuckCount = 0; -1975 for (WorkerThread worker: workerThreads) { -1976 if (worker.getCurrentRunTime() < stuckThreshold) { -1977 continue; -1978 } -1979 -1980 // WARN the worker is stuck -1981 stuckCount++; -1982 LOG.warn("Worker stuck " + worker + -1983 " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime())); -1984 } -1985 return stuckCount; -1986 } -1987 -1988 private void checkThreadCount(final int stuckCount) { -1989 // nothing to do if there are no runnable tasks -1990 if (stuckCount < 1 || !scheduler.hasRunnables()) return; -1991 -1992 // add a new thread if the worker stuck percentage exceed the threshold limit -1993 // and every handler is active. -1994 final float stuckPerc = ((float)stuckCount) / workerThreads.size(); -1995 if (stuckPerc >= addWorkerStuckPercentage && -1996 activeExecutorCount.get() == workerThreads.size()) { -1997 final WorkerThread worker = new WorkerThread(threadGroup); -1998 workerThreads.add(worker); -1999 worker.start(); -2000 LOG.debug("Added new worker thread " + worker); -2001 } -2002 } -2003 -2004 private void refreshConfig() { -2005 addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, -2006 DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); -2007 timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, -2008 DEFAULT_WORKER_MONITOR_INTERVAL); -2009 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, -2010 DEFAULT_WORKER_STUCK_THRESHOLD); -2011 } -2012 -2013 @Override -2014 public int getTimeoutInterval() { -2015 return timeoutInterval; -2016 } -2017 } -2018} +1924 public abstract int getTimeoutInterval(); +1925 +1926 protected void refreshTimeout() { +1927 this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval(); +1928 } +1929 +1930 @Override +1931 public long getTimeout() { +1932 return timeout; +1933 } +1934 } +1935 +1936 // ---------------------------------------------------------------------------- +1937 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the +1938 // full set of procedures pending and completed to write a compacted +1939 // version of the log (in case is a log)? +1940 // In theory no, procedures are have a short life, so at some point the store +1941 // will have the tracker saying everything is in the last log. +1942 // ---------------------------------------------------------------------------- +1943 +1944 private final class WorkerMonitor extends InlineChore { +1945 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = +1946 "hbase.procedure.worker.monitor.interval.msec"; +1947 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec +1948 +1949 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = +1950 "hbase.procedure.worker.stuck.threshold.msec"; +1951 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec +1952 +1953 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = +1954 "hbase.procedure.worker.add.stuck.percentage"; +1955 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck +1956 +1957 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; +1958 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; +1959 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; +1960 +1961 public WorkerMonitor() { +1962 refreshConfig(); +1963 } +1964 +1965 @Override +1966 public void run() { +1967 final int stuckCount = checkForStuckWorkers(); +1968 checkThreadCount(stuckCount); +1969 +1970 // refresh interval (poor man dynamic conf update) +1971 refreshConfig(); +1972 } +1973 +1974 private int checkForStuckWorkers() { +1975 // check if any of the worker is stuck +1976 int stuckCount = 0; +1977 for (WorkerThread worker: workerThreads) { +1978 if (worker.getCurrentRunTime() < stuckThreshold) { +1979 continue; +1980 } +1981 +1982 // WARN the worker is stuck +1983 stuckCount++; +1984 LOG.warn("Worker stuck " + worker + +1985 " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime())); +1986 } +1987 return stuckCount; +1988 } +1989 +1990 private void checkThreadCount(final int stuckCount) { +1991 // nothing to do if there are no runnable tasks +1992 if (stuckCount < 1 || !scheduler.hasRunnables()) return; +1993 +1994 // add a new thread if the worker stuck percentage exceed the threshold limit +1995 // and every handler is active. +1996 final float stuckPerc = ((float)stuckCount) / workerThreads.size(); +1997 if (stuckPerc >= addWorkerStuckPercentage && +1998 activeExecutorCount.get() == workerThreads.size()) { +1999 final WorkerThread worker = new WorkerThread(threadGroup); +2000 workerThreads.add(worker); +2001 worker.start(); +2002 LOG.debug("Added new worker thread " + worker); +2003 } +2004 } +2005 +2006 private void refreshConfig() { +2007 addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, +2008 DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); +2009 timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, +2010 DEFAULT_WORKER_MONITOR_INTERVAL); +2011 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, +2012 DEFAULT_WORKER_STUCK_THRESHOLD); +2013 } +2014 +2015 @Override +2016 public int getTimeoutInterval() { +2017 return timeoutInterval; +2018 } +2019 } +2020}