hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From git-site-r...@apache.org
Subject [32/51] [partial] hbase-site git commit: Published site at .
Date Fri, 19 Jan 2018 15:31:23 GMT
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/14db89d7/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html
index b50a65f..7271567 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html
@@ -1718,312 +1718,314 @@
 <span class="sourceLineNo">1710</span><a name="line.1710"></a>
 <span class="sourceLineNo">1711</span>    public WorkerThread(final ThreadGroup group) {<a name="line.1711"></a>
 <span class="sourceLineNo">1712</span>      super(group, "ProcExecWrkr-" + workerId.incrementAndGet());<a name="line.1712"></a>
-<span class="sourceLineNo">1713</span>    }<a name="line.1713"></a>
-<span class="sourceLineNo">1714</span><a name="line.1714"></a>
-<span class="sourceLineNo">1715</span>    @Override<a name="line.1715"></a>
-<span class="sourceLineNo">1716</span>    public void sendStopSignal() {<a name="line.1716"></a>
-<span class="sourceLineNo">1717</span>      scheduler.signalAll();<a name="line.1717"></a>
-<span class="sourceLineNo">1718</span>    }<a name="line.1718"></a>
-<span class="sourceLineNo">1719</span><a name="line.1719"></a>
-<span class="sourceLineNo">1720</span>    @Override<a name="line.1720"></a>
-<span class="sourceLineNo">1721</span>    public void run() {<a name="line.1721"></a>
-<span class="sourceLineNo">1722</span>      long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1722"></a>
-<span class="sourceLineNo">1723</span>      try {<a name="line.1723"></a>
-<span class="sourceLineNo">1724</span>        while (isRunning() &amp;&amp; keepAlive(lastUpdate)) {<a name="line.1724"></a>
-<span class="sourceLineNo">1725</span>          this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1725"></a>
-<span class="sourceLineNo">1726</span>          if (this.activeProcedure == null) continue;<a name="line.1726"></a>
-<span class="sourceLineNo">1727</span>          int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1727"></a>
-<span class="sourceLineNo">1728</span>          int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1728"></a>
-<span class="sourceLineNo">1729</span>          if (LOG.isTraceEnabled()) {<a name="line.1729"></a>
-<span class="sourceLineNo">1730</span>            LOG.trace("Execute pid=" + this.activeProcedure.getProcId() +<a name="line.1730"></a>
-<span class="sourceLineNo">1731</span>                " runningCount=" + runningCount + ", activeCount=" + activeCount);<a name="line.1731"></a>
-<span class="sourceLineNo">1732</span>          }<a name="line.1732"></a>
-<span class="sourceLineNo">1733</span>          executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1733"></a>
-<span class="sourceLineNo">1734</span>          try {<a name="line.1734"></a>
-<span class="sourceLineNo">1735</span>            executeProcedure(this.activeProcedure);<a name="line.1735"></a>
-<span class="sourceLineNo">1736</span>          } catch (AssertionError e) {<a name="line.1736"></a>
-<span class="sourceLineNo">1737</span>            LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e);<a name="line.1737"></a>
-<span class="sourceLineNo">1738</span>            throw e;<a name="line.1738"></a>
-<span class="sourceLineNo">1739</span>          } finally {<a name="line.1739"></a>
-<span class="sourceLineNo">1740</span>            activeCount = activeExecutorCount.decrementAndGet();<a name="line.1740"></a>
-<span class="sourceLineNo">1741</span>            runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1741"></a>
-<span class="sourceLineNo">1742</span>            if (LOG.isTraceEnabled()) {<a name="line.1742"></a>
-<span class="sourceLineNo">1743</span>              LOG.trace("Halt pid=" + this.activeProcedure.getProcId() +<a name="line.1743"></a>
-<span class="sourceLineNo">1744</span>                  " runningCount=" + runningCount + ", activeCount=" + activeCount);<a name="line.1744"></a>
-<span class="sourceLineNo">1745</span>            }<a name="line.1745"></a>
-<span class="sourceLineNo">1746</span>            this.activeProcedure = null;<a name="line.1746"></a>
-<span class="sourceLineNo">1747</span>            lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1747"></a>
-<span class="sourceLineNo">1748</span>            executionStartTime.set(Long.MAX_VALUE);<a name="line.1748"></a>
-<span class="sourceLineNo">1749</span>          }<a name="line.1749"></a>
-<span class="sourceLineNo">1750</span>        }<a name="line.1750"></a>
-<span class="sourceLineNo">1751</span>      } catch (Throwable t) {<a name="line.1751"></a>
-<span class="sourceLineNo">1752</span>        LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t);<a name="line.1752"></a>
-<span class="sourceLineNo">1753</span>      } finally {<a name="line.1753"></a>
-<span class="sourceLineNo">1754</span>        LOG.debug("Worker terminated.");<a name="line.1754"></a>
-<span class="sourceLineNo">1755</span>      }<a name="line.1755"></a>
-<span class="sourceLineNo">1756</span>      workerThreads.remove(this);<a name="line.1756"></a>
-<span class="sourceLineNo">1757</span>    }<a name="line.1757"></a>
-<span class="sourceLineNo">1758</span><a name="line.1758"></a>
-<span class="sourceLineNo">1759</span>    @Override<a name="line.1759"></a>
-<span class="sourceLineNo">1760</span>    public String toString() {<a name="line.1760"></a>
-<span class="sourceLineNo">1761</span>      Procedure&lt;?&gt; p = this.activeProcedure;<a name="line.1761"></a>
-<span class="sourceLineNo">1762</span>      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1762"></a>
-<span class="sourceLineNo">1763</span>    }<a name="line.1763"></a>
-<span class="sourceLineNo">1764</span><a name="line.1764"></a>
-<span class="sourceLineNo">1765</span>    /**<a name="line.1765"></a>
-<span class="sourceLineNo">1766</span>     * @return the time since the current procedure is running<a name="line.1766"></a>
-<span class="sourceLineNo">1767</span>     */<a name="line.1767"></a>
-<span class="sourceLineNo">1768</span>    public long getCurrentRunTime() {<a name="line.1768"></a>
-<span class="sourceLineNo">1769</span>      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1769"></a>
-<span class="sourceLineNo">1770</span>    }<a name="line.1770"></a>
-<span class="sourceLineNo">1771</span><a name="line.1771"></a>
-<span class="sourceLineNo">1772</span>    private boolean keepAlive(final long lastUpdate) {<a name="line.1772"></a>
-<span class="sourceLineNo">1773</span>      if (workerThreads.size() &lt;= corePoolSize) return true;<a name="line.1773"></a>
-<span class="sourceLineNo">1774</span>      return (EnvironmentEdgeManager.currentTime() - lastUpdate) &lt; keepAliveTime;<a name="line.1774"></a>
-<span class="sourceLineNo">1775</span>    }<a name="line.1775"></a>
-<span class="sourceLineNo">1776</span>  }<a name="line.1776"></a>
-<span class="sourceLineNo">1777</span><a name="line.1777"></a>
-<span class="sourceLineNo">1778</span>  /**<a name="line.1778"></a>
-<span class="sourceLineNo">1779</span>   * Runs task on a period such as check for stuck workers.<a name="line.1779"></a>
-<span class="sourceLineNo">1780</span>   * @see InlineChore<a name="line.1780"></a>
-<span class="sourceLineNo">1781</span>   */<a name="line.1781"></a>
-<span class="sourceLineNo">1782</span>  private final class TimeoutExecutorThread extends StoppableThread {<a name="line.1782"></a>
-<span class="sourceLineNo">1783</span>    private final DelayQueue&lt;DelayedWithTimeout&gt; queue = new DelayQueue&lt;&gt;();<a name="line.1783"></a>
-<span class="sourceLineNo">1784</span><a name="line.1784"></a>
-<span class="sourceLineNo">1785</span>    public TimeoutExecutorThread(final ThreadGroup group) {<a name="line.1785"></a>
-<span class="sourceLineNo">1786</span>      super(group, "ProcExecTimeout");<a name="line.1786"></a>
-<span class="sourceLineNo">1787</span>    }<a name="line.1787"></a>
-<span class="sourceLineNo">1788</span><a name="line.1788"></a>
-<span class="sourceLineNo">1789</span>    @Override<a name="line.1789"></a>
-<span class="sourceLineNo">1790</span>    public void sendStopSignal() {<a name="line.1790"></a>
-<span class="sourceLineNo">1791</span>      queue.add(DelayedUtil.DELAYED_POISON);<a name="line.1791"></a>
-<span class="sourceLineNo">1792</span>    }<a name="line.1792"></a>
-<span class="sourceLineNo">1793</span><a name="line.1793"></a>
-<span class="sourceLineNo">1794</span>    @Override<a name="line.1794"></a>
-<span class="sourceLineNo">1795</span>    public void run() {<a name="line.1795"></a>
-<span class="sourceLineNo">1796</span>      final boolean traceEnabled = LOG.isTraceEnabled();<a name="line.1796"></a>
-<span class="sourceLineNo">1797</span>      while (isRunning()) {<a name="line.1797"></a>
-<span class="sourceLineNo">1798</span>        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);<a name="line.1798"></a>
-<span class="sourceLineNo">1799</span>        if (task == null || task == DelayedUtil.DELAYED_POISON) {<a name="line.1799"></a>
-<span class="sourceLineNo">1800</span>          // the executor may be shutting down,<a name="line.1800"></a>
-<span class="sourceLineNo">1801</span>          // and the task is just the shutdown request<a name="line.1801"></a>
-<span class="sourceLineNo">1802</span>          continue;<a name="line.1802"></a>
-<span class="sourceLineNo">1803</span>        }<a name="line.1803"></a>
-<span class="sourceLineNo">1804</span><a name="line.1804"></a>
-<span class="sourceLineNo">1805</span>        if (traceEnabled) {<a name="line.1805"></a>
-<span class="sourceLineNo">1806</span>          LOG.trace("Executing " + task);<a name="line.1806"></a>
-<span class="sourceLineNo">1807</span>        }<a name="line.1807"></a>
-<span class="sourceLineNo">1808</span><a name="line.1808"></a>
-<span class="sourceLineNo">1809</span>        // execute the task<a name="line.1809"></a>
-<span class="sourceLineNo">1810</span>        if (task instanceof InlineChore) {<a name="line.1810"></a>
-<span class="sourceLineNo">1811</span>          execInlineChore((InlineChore)task);<a name="line.1811"></a>
-<span class="sourceLineNo">1812</span>        } else if (task instanceof DelayedProcedure) {<a name="line.1812"></a>
-<span class="sourceLineNo">1813</span>          execDelayedProcedure((DelayedProcedure)task);<a name="line.1813"></a>
-<span class="sourceLineNo">1814</span>        } else {<a name="line.1814"></a>
-<span class="sourceLineNo">1815</span>          LOG.error("CODE-BUG unknown timeout task type " + task);<a name="line.1815"></a>
-<span class="sourceLineNo">1816</span>        }<a name="line.1816"></a>
-<span class="sourceLineNo">1817</span>      }<a name="line.1817"></a>
-<span class="sourceLineNo">1818</span>    }<a name="line.1818"></a>
-<span class="sourceLineNo">1819</span><a name="line.1819"></a>
-<span class="sourceLineNo">1820</span>    public void add(final InlineChore chore) {<a name="line.1820"></a>
-<span class="sourceLineNo">1821</span>      chore.refreshTimeout();<a name="line.1821"></a>
-<span class="sourceLineNo">1822</span>      queue.add(chore);<a name="line.1822"></a>
-<span class="sourceLineNo">1823</span>    }<a name="line.1823"></a>
-<span class="sourceLineNo">1824</span><a name="line.1824"></a>
-<span class="sourceLineNo">1825</span>    public void add(final Procedure procedure) {<a name="line.1825"></a>
-<span class="sourceLineNo">1826</span>      assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;<a name="line.1826"></a>
-<span class="sourceLineNo">1827</span>      LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +<a name="line.1827"></a>
-<span class="sourceLineNo">1828</span>          ", timestamp=" + procedure.getTimeoutTimestamp());<a name="line.1828"></a>
-<span class="sourceLineNo">1829</span>      queue.add(new DelayedProcedure(procedure));<a name="line.1829"></a>
-<span class="sourceLineNo">1830</span>    }<a name="line.1830"></a>
-<span class="sourceLineNo">1831</span><a name="line.1831"></a>
-<span class="sourceLineNo">1832</span>    public boolean remove(final Procedure procedure) {<a name="line.1832"></a>
-<span class="sourceLineNo">1833</span>      return queue.remove(new DelayedProcedure(procedure));<a name="line.1833"></a>
-<span class="sourceLineNo">1834</span>    }<a name="line.1834"></a>
-<span class="sourceLineNo">1835</span><a name="line.1835"></a>
-<span class="sourceLineNo">1836</span>    private void execInlineChore(final InlineChore chore) {<a name="line.1836"></a>
-<span class="sourceLineNo">1837</span>      chore.run();<a name="line.1837"></a>
-<span class="sourceLineNo">1838</span>      add(chore);<a name="line.1838"></a>
-<span class="sourceLineNo">1839</span>    }<a name="line.1839"></a>
-<span class="sourceLineNo">1840</span><a name="line.1840"></a>
-<span class="sourceLineNo">1841</span>    private void execDelayedProcedure(final DelayedProcedure delayed) {<a name="line.1841"></a>
-<span class="sourceLineNo">1842</span>      // TODO: treat this as a normal procedure, add it to the scheduler and<a name="line.1842"></a>
-<span class="sourceLineNo">1843</span>      // let one of the workers handle it.<a name="line.1843"></a>
-<span class="sourceLineNo">1844</span>      // Today we consider ProcedureInMemoryChore as InlineChores<a name="line.1844"></a>
-<span class="sourceLineNo">1845</span>      final Procedure procedure = delayed.getObject();<a name="line.1845"></a>
-<span class="sourceLineNo">1846</span>      if (procedure instanceof ProcedureInMemoryChore) {<a name="line.1846"></a>
-<span class="sourceLineNo">1847</span>        executeInMemoryChore((ProcedureInMemoryChore)procedure);<a name="line.1847"></a>
-<span class="sourceLineNo">1848</span>        // if the procedure is in a waiting state again, put it back in the queue<a name="line.1848"></a>
-<span class="sourceLineNo">1849</span>        procedure.updateTimestamp();<a name="line.1849"></a>
-<span class="sourceLineNo">1850</span>        if (procedure.isWaiting()) {<a name="line.1850"></a>
-<span class="sourceLineNo">1851</span>          delayed.setTimeout(procedure.getTimeoutTimestamp());<a name="line.1851"></a>
-<span class="sourceLineNo">1852</span>          queue.add(delayed);<a name="line.1852"></a>
-<span class="sourceLineNo">1853</span>        }<a name="line.1853"></a>
-<span class="sourceLineNo">1854</span>      } else {<a name="line.1854"></a>
-<span class="sourceLineNo">1855</span>        executeTimedoutProcedure(procedure);<a name="line.1855"></a>
-<span class="sourceLineNo">1856</span>      }<a name="line.1856"></a>
-<span class="sourceLineNo">1857</span>    }<a name="line.1857"></a>
-<span class="sourceLineNo">1858</span><a name="line.1858"></a>
-<span class="sourceLineNo">1859</span>    private void executeInMemoryChore(final ProcedureInMemoryChore chore) {<a name="line.1859"></a>
-<span class="sourceLineNo">1860</span>      if (!chore.isWaiting()) return;<a name="line.1860"></a>
-<span class="sourceLineNo">1861</span><a name="line.1861"></a>
-<span class="sourceLineNo">1862</span>      // The ProcedureInMemoryChore is a special case, and it acts as a chore.<a name="line.1862"></a>
-<span class="sourceLineNo">1863</span>      // instead of bringing the Chore class in, we reuse this timeout thread for<a name="line.1863"></a>
-<span class="sourceLineNo">1864</span>      // this special case.<a name="line.1864"></a>
-<span class="sourceLineNo">1865</span>      try {<a name="line.1865"></a>
-<span class="sourceLineNo">1866</span>        chore.periodicExecute(getEnvironment());<a name="line.1866"></a>
-<span class="sourceLineNo">1867</span>      } catch (Throwable e) {<a name="line.1867"></a>
-<span class="sourceLineNo">1868</span>        LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e);<a name="line.1868"></a>
-<span class="sourceLineNo">1869</span>      }<a name="line.1869"></a>
-<span class="sourceLineNo">1870</span>    }<a name="line.1870"></a>
-<span class="sourceLineNo">1871</span><a name="line.1871"></a>
-<span class="sourceLineNo">1872</span>    private void executeTimedoutProcedure(final Procedure proc) {<a name="line.1872"></a>
-<span class="sourceLineNo">1873</span>      // The procedure received a timeout. if the procedure itself does not handle it,<a name="line.1873"></a>
-<span class="sourceLineNo">1874</span>      // call abort() and add the procedure back in the queue for rollback.<a name="line.1874"></a>
-<span class="sourceLineNo">1875</span>      if (proc.setTimeoutFailure(getEnvironment())) {<a name="line.1875"></a>
-<span class="sourceLineNo">1876</span>        long rootProcId = Procedure.getRootProcedureId(procedures, proc);<a name="line.1876"></a>
-<span class="sourceLineNo">1877</span>        RootProcedureState procStack = rollbackStack.get(rootProcId);<a name="line.1877"></a>
-<span class="sourceLineNo">1878</span>        procStack.abort();<a name="line.1878"></a>
-<span class="sourceLineNo">1879</span>        store.update(proc);<a name="line.1879"></a>
-<span class="sourceLineNo">1880</span>        scheduler.addFront(proc);<a name="line.1880"></a>
-<span class="sourceLineNo">1881</span>      }<a name="line.1881"></a>
-<span class="sourceLineNo">1882</span>    }<a name="line.1882"></a>
-<span class="sourceLineNo">1883</span>  }<a name="line.1883"></a>
-<span class="sourceLineNo">1884</span><a name="line.1884"></a>
-<span class="sourceLineNo">1885</span>  private static final class DelayedProcedure<a name="line.1885"></a>
-<span class="sourceLineNo">1886</span>      extends DelayedUtil.DelayedContainerWithTimestamp&lt;Procedure&gt; {<a name="line.1886"></a>
-<span class="sourceLineNo">1887</span>    public DelayedProcedure(final Procedure procedure) {<a name="line.1887"></a>
-<span class="sourceLineNo">1888</span>      super(procedure, procedure.getTimeoutTimestamp());<a name="line.1888"></a>
-<span class="sourceLineNo">1889</span>    }<a name="line.1889"></a>
-<span class="sourceLineNo">1890</span>  }<a name="line.1890"></a>
-<span class="sourceLineNo">1891</span><a name="line.1891"></a>
-<span class="sourceLineNo">1892</span>  private static abstract class StoppableThread extends Thread {<a name="line.1892"></a>
-<span class="sourceLineNo">1893</span>    public StoppableThread(final ThreadGroup group, final String name) {<a name="line.1893"></a>
-<span class="sourceLineNo">1894</span>      super(group, name);<a name="line.1894"></a>
-<span class="sourceLineNo">1895</span>    }<a name="line.1895"></a>
-<span class="sourceLineNo">1896</span><a name="line.1896"></a>
-<span class="sourceLineNo">1897</span>    public abstract void sendStopSignal();<a name="line.1897"></a>
+<span class="sourceLineNo">1713</span>      setDaemon(true);<a name="line.1713"></a>
+<span class="sourceLineNo">1714</span>    }<a name="line.1714"></a>
+<span class="sourceLineNo">1715</span><a name="line.1715"></a>
+<span class="sourceLineNo">1716</span>    @Override<a name="line.1716"></a>
+<span class="sourceLineNo">1717</span>    public void sendStopSignal() {<a name="line.1717"></a>
+<span class="sourceLineNo">1718</span>      scheduler.signalAll();<a name="line.1718"></a>
+<span class="sourceLineNo">1719</span>    }<a name="line.1719"></a>
+<span class="sourceLineNo">1720</span><a name="line.1720"></a>
+<span class="sourceLineNo">1721</span>    @Override<a name="line.1721"></a>
+<span class="sourceLineNo">1722</span>    public void run() {<a name="line.1722"></a>
+<span class="sourceLineNo">1723</span>      long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1723"></a>
+<span class="sourceLineNo">1724</span>      try {<a name="line.1724"></a>
+<span class="sourceLineNo">1725</span>        while (isRunning() &amp;&amp; keepAlive(lastUpdate)) {<a name="line.1725"></a>
+<span class="sourceLineNo">1726</span>          this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1726"></a>
+<span class="sourceLineNo">1727</span>          if (this.activeProcedure == null) continue;<a name="line.1727"></a>
+<span class="sourceLineNo">1728</span>          int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1728"></a>
+<span class="sourceLineNo">1729</span>          int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1729"></a>
+<span class="sourceLineNo">1730</span>          if (LOG.isTraceEnabled()) {<a name="line.1730"></a>
+<span class="sourceLineNo">1731</span>            LOG.trace("Execute pid=" + this.activeProcedure.getProcId() +<a name="line.1731"></a>
+<span class="sourceLineNo">1732</span>                " runningCount=" + runningCount + ", activeCount=" + activeCount);<a name="line.1732"></a>
+<span class="sourceLineNo">1733</span>          }<a name="line.1733"></a>
+<span class="sourceLineNo">1734</span>          executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1734"></a>
+<span class="sourceLineNo">1735</span>          try {<a name="line.1735"></a>
+<span class="sourceLineNo">1736</span>            executeProcedure(this.activeProcedure);<a name="line.1736"></a>
+<span class="sourceLineNo">1737</span>          } catch (AssertionError e) {<a name="line.1737"></a>
+<span class="sourceLineNo">1738</span>            LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e);<a name="line.1738"></a>
+<span class="sourceLineNo">1739</span>            throw e;<a name="line.1739"></a>
+<span class="sourceLineNo">1740</span>          } finally {<a name="line.1740"></a>
+<span class="sourceLineNo">1741</span>            activeCount = activeExecutorCount.decrementAndGet();<a name="line.1741"></a>
+<span class="sourceLineNo">1742</span>            runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1742"></a>
+<span class="sourceLineNo">1743</span>            if (LOG.isTraceEnabled()) {<a name="line.1743"></a>
+<span class="sourceLineNo">1744</span>              LOG.trace("Halt pid=" + this.activeProcedure.getProcId() +<a name="line.1744"></a>
+<span class="sourceLineNo">1745</span>                  " runningCount=" + runningCount + ", activeCount=" + activeCount);<a name="line.1745"></a>
+<span class="sourceLineNo">1746</span>            }<a name="line.1746"></a>
+<span class="sourceLineNo">1747</span>            this.activeProcedure = null;<a name="line.1747"></a>
+<span class="sourceLineNo">1748</span>            lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1748"></a>
+<span class="sourceLineNo">1749</span>            executionStartTime.set(Long.MAX_VALUE);<a name="line.1749"></a>
+<span class="sourceLineNo">1750</span>          }<a name="line.1750"></a>
+<span class="sourceLineNo">1751</span>        }<a name="line.1751"></a>
+<span class="sourceLineNo">1752</span>      } catch (Throwable t) {<a name="line.1752"></a>
+<span class="sourceLineNo">1753</span>        LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t);<a name="line.1753"></a>
+<span class="sourceLineNo">1754</span>      } finally {<a name="line.1754"></a>
+<span class="sourceLineNo">1755</span>        LOG.debug("Worker terminated.");<a name="line.1755"></a>
+<span class="sourceLineNo">1756</span>      }<a name="line.1756"></a>
+<span class="sourceLineNo">1757</span>      workerThreads.remove(this);<a name="line.1757"></a>
+<span class="sourceLineNo">1758</span>    }<a name="line.1758"></a>
+<span class="sourceLineNo">1759</span><a name="line.1759"></a>
+<span class="sourceLineNo">1760</span>    @Override<a name="line.1760"></a>
+<span class="sourceLineNo">1761</span>    public String toString() {<a name="line.1761"></a>
+<span class="sourceLineNo">1762</span>      Procedure&lt;?&gt; p = this.activeProcedure;<a name="line.1762"></a>
+<span class="sourceLineNo">1763</span>      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1763"></a>
+<span class="sourceLineNo">1764</span>    }<a name="line.1764"></a>
+<span class="sourceLineNo">1765</span><a name="line.1765"></a>
+<span class="sourceLineNo">1766</span>    /**<a name="line.1766"></a>
+<span class="sourceLineNo">1767</span>     * @return the time since the current procedure is running<a name="line.1767"></a>
+<span class="sourceLineNo">1768</span>     */<a name="line.1768"></a>
+<span class="sourceLineNo">1769</span>    public long getCurrentRunTime() {<a name="line.1769"></a>
+<span class="sourceLineNo">1770</span>      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1770"></a>
+<span class="sourceLineNo">1771</span>    }<a name="line.1771"></a>
+<span class="sourceLineNo">1772</span><a name="line.1772"></a>
+<span class="sourceLineNo">1773</span>    private boolean keepAlive(final long lastUpdate) {<a name="line.1773"></a>
+<span class="sourceLineNo">1774</span>      if (workerThreads.size() &lt;= corePoolSize) return true;<a name="line.1774"></a>
+<span class="sourceLineNo">1775</span>      return (EnvironmentEdgeManager.currentTime() - lastUpdate) &lt; keepAliveTime;<a name="line.1775"></a>
+<span class="sourceLineNo">1776</span>    }<a name="line.1776"></a>
+<span class="sourceLineNo">1777</span>  }<a name="line.1777"></a>
+<span class="sourceLineNo">1778</span><a name="line.1778"></a>
+<span class="sourceLineNo">1779</span>  /**<a name="line.1779"></a>
+<span class="sourceLineNo">1780</span>   * Runs task on a period such as check for stuck workers.<a name="line.1780"></a>
+<span class="sourceLineNo">1781</span>   * @see InlineChore<a name="line.1781"></a>
+<span class="sourceLineNo">1782</span>   */<a name="line.1782"></a>
+<span class="sourceLineNo">1783</span>  private final class TimeoutExecutorThread extends StoppableThread {<a name="line.1783"></a>
+<span class="sourceLineNo">1784</span>    private final DelayQueue&lt;DelayedWithTimeout&gt; queue = new DelayQueue&lt;&gt;();<a name="line.1784"></a>
+<span class="sourceLineNo">1785</span><a name="line.1785"></a>
+<span class="sourceLineNo">1786</span>    public TimeoutExecutorThread(final ThreadGroup group) {<a name="line.1786"></a>
+<span class="sourceLineNo">1787</span>      super(group, "ProcExecTimeout");<a name="line.1787"></a>
+<span class="sourceLineNo">1788</span>      setDaemon(true);<a name="line.1788"></a>
+<span class="sourceLineNo">1789</span>    }<a name="line.1789"></a>
+<span class="sourceLineNo">1790</span><a name="line.1790"></a>
+<span class="sourceLineNo">1791</span>    @Override<a name="line.1791"></a>
+<span class="sourceLineNo">1792</span>    public void sendStopSignal() {<a name="line.1792"></a>
+<span class="sourceLineNo">1793</span>      queue.add(DelayedUtil.DELAYED_POISON);<a name="line.1793"></a>
+<span class="sourceLineNo">1794</span>    }<a name="line.1794"></a>
+<span class="sourceLineNo">1795</span><a name="line.1795"></a>
+<span class="sourceLineNo">1796</span>    @Override<a name="line.1796"></a>
+<span class="sourceLineNo">1797</span>    public void run() {<a name="line.1797"></a>
+<span class="sourceLineNo">1798</span>      final boolean traceEnabled = LOG.isTraceEnabled();<a name="line.1798"></a>
+<span class="sourceLineNo">1799</span>      while (isRunning()) {<a name="line.1799"></a>
+<span class="sourceLineNo">1800</span>        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);<a name="line.1800"></a>
+<span class="sourceLineNo">1801</span>        if (task == null || task == DelayedUtil.DELAYED_POISON) {<a name="line.1801"></a>
+<span class="sourceLineNo">1802</span>          // the executor may be shutting down,<a name="line.1802"></a>
+<span class="sourceLineNo">1803</span>          // and the task is just the shutdown request<a name="line.1803"></a>
+<span class="sourceLineNo">1804</span>          continue;<a name="line.1804"></a>
+<span class="sourceLineNo">1805</span>        }<a name="line.1805"></a>
+<span class="sourceLineNo">1806</span><a name="line.1806"></a>
+<span class="sourceLineNo">1807</span>        if (traceEnabled) {<a name="line.1807"></a>
+<span class="sourceLineNo">1808</span>          LOG.trace("Executing " + task);<a name="line.1808"></a>
+<span class="sourceLineNo">1809</span>        }<a name="line.1809"></a>
+<span class="sourceLineNo">1810</span><a name="line.1810"></a>
+<span class="sourceLineNo">1811</span>        // execute the task<a name="line.1811"></a>
+<span class="sourceLineNo">1812</span>        if (task instanceof InlineChore) {<a name="line.1812"></a>
+<span class="sourceLineNo">1813</span>          execInlineChore((InlineChore)task);<a name="line.1813"></a>
+<span class="sourceLineNo">1814</span>        } else if (task instanceof DelayedProcedure) {<a name="line.1814"></a>
+<span class="sourceLineNo">1815</span>          execDelayedProcedure((DelayedProcedure)task);<a name="line.1815"></a>
+<span class="sourceLineNo">1816</span>        } else {<a name="line.1816"></a>
+<span class="sourceLineNo">1817</span>          LOG.error("CODE-BUG unknown timeout task type " + task);<a name="line.1817"></a>
+<span class="sourceLineNo">1818</span>        }<a name="line.1818"></a>
+<span class="sourceLineNo">1819</span>      }<a name="line.1819"></a>
+<span class="sourceLineNo">1820</span>    }<a name="line.1820"></a>
+<span class="sourceLineNo">1821</span><a name="line.1821"></a>
+<span class="sourceLineNo">1822</span>    public void add(final InlineChore chore) {<a name="line.1822"></a>
+<span class="sourceLineNo">1823</span>      chore.refreshTimeout();<a name="line.1823"></a>
+<span class="sourceLineNo">1824</span>      queue.add(chore);<a name="line.1824"></a>
+<span class="sourceLineNo">1825</span>    }<a name="line.1825"></a>
+<span class="sourceLineNo">1826</span><a name="line.1826"></a>
+<span class="sourceLineNo">1827</span>    public void add(final Procedure procedure) {<a name="line.1827"></a>
+<span class="sourceLineNo">1828</span>      assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;<a name="line.1828"></a>
+<span class="sourceLineNo">1829</span>      LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +<a name="line.1829"></a>
+<span class="sourceLineNo">1830</span>          ", timestamp=" + procedure.getTimeoutTimestamp());<a name="line.1830"></a>
+<span class="sourceLineNo">1831</span>      queue.add(new DelayedProcedure(procedure));<a name="line.1831"></a>
+<span class="sourceLineNo">1832</span>    }<a name="line.1832"></a>
+<span class="sourceLineNo">1833</span><a name="line.1833"></a>
+<span class="sourceLineNo">1834</span>    public boolean remove(final Procedure procedure) {<a name="line.1834"></a>
+<span class="sourceLineNo">1835</span>      return queue.remove(new DelayedProcedure(procedure));<a name="line.1835"></a>
+<span class="sourceLineNo">1836</span>    }<a name="line.1836"></a>
+<span class="sourceLineNo">1837</span><a name="line.1837"></a>
+<span class="sourceLineNo">1838</span>    private void execInlineChore(final InlineChore chore) {<a name="line.1838"></a>
+<span class="sourceLineNo">1839</span>      chore.run();<a name="line.1839"></a>
+<span class="sourceLineNo">1840</span>      add(chore);<a name="line.1840"></a>
+<span class="sourceLineNo">1841</span>    }<a name="line.1841"></a>
+<span class="sourceLineNo">1842</span><a name="line.1842"></a>
+<span class="sourceLineNo">1843</span>    private void execDelayedProcedure(final DelayedProcedure delayed) {<a name="line.1843"></a>
+<span class="sourceLineNo">1844</span>      // TODO: treat this as a normal procedure, add it to the scheduler and<a name="line.1844"></a>
+<span class="sourceLineNo">1845</span>      // let one of the workers handle it.<a name="line.1845"></a>
+<span class="sourceLineNo">1846</span>      // Today we consider ProcedureInMemoryChore as InlineChores<a name="line.1846"></a>
+<span class="sourceLineNo">1847</span>      final Procedure procedure = delayed.getObject();<a name="line.1847"></a>
+<span class="sourceLineNo">1848</span>      if (procedure instanceof ProcedureInMemoryChore) {<a name="line.1848"></a>
+<span class="sourceLineNo">1849</span>        executeInMemoryChore((ProcedureInMemoryChore)procedure);<a name="line.1849"></a>
+<span class="sourceLineNo">1850</span>        // if the procedure is in a waiting state again, put it back in the queue<a name="line.1850"></a>
+<span class="sourceLineNo">1851</span>        procedure.updateTimestamp();<a name="line.1851"></a>
+<span class="sourceLineNo">1852</span>        if (procedure.isWaiting()) {<a name="line.1852"></a>
+<span class="sourceLineNo">1853</span>          delayed.setTimeout(procedure.getTimeoutTimestamp());<a name="line.1853"></a>
+<span class="sourceLineNo">1854</span>          queue.add(delayed);<a name="line.1854"></a>
+<span class="sourceLineNo">1855</span>        }<a name="line.1855"></a>
+<span class="sourceLineNo">1856</span>      } else {<a name="line.1856"></a>
+<span class="sourceLineNo">1857</span>        executeTimedoutProcedure(procedure);<a name="line.1857"></a>
+<span class="sourceLineNo">1858</span>      }<a name="line.1858"></a>
+<span class="sourceLineNo">1859</span>    }<a name="line.1859"></a>
+<span class="sourceLineNo">1860</span><a name="line.1860"></a>
+<span class="sourceLineNo">1861</span>    private void executeInMemoryChore(final ProcedureInMemoryChore chore) {<a name="line.1861"></a>
+<span class="sourceLineNo">1862</span>      if (!chore.isWaiting()) return;<a name="line.1862"></a>
+<span class="sourceLineNo">1863</span><a name="line.1863"></a>
+<span class="sourceLineNo">1864</span>      // The ProcedureInMemoryChore is a special case, and it acts as a chore.<a name="line.1864"></a>
+<span class="sourceLineNo">1865</span>      // instead of bringing the Chore class in, we reuse this timeout thread for<a name="line.1865"></a>
+<span class="sourceLineNo">1866</span>      // this special case.<a name="line.1866"></a>
+<span class="sourceLineNo">1867</span>      try {<a name="line.1867"></a>
+<span class="sourceLineNo">1868</span>        chore.periodicExecute(getEnvironment());<a name="line.1868"></a>
+<span class="sourceLineNo">1869</span>      } catch (Throwable e) {<a name="line.1869"></a>
+<span class="sourceLineNo">1870</span>        LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e);<a name="line.1870"></a>
+<span class="sourceLineNo">1871</span>      }<a name="line.1871"></a>
+<span class="sourceLineNo">1872</span>    }<a name="line.1872"></a>
+<span class="sourceLineNo">1873</span><a name="line.1873"></a>
+<span class="sourceLineNo">1874</span>    private void executeTimedoutProcedure(final Procedure proc) {<a name="line.1874"></a>
+<span class="sourceLineNo">1875</span>      // The procedure received a timeout. if the procedure itself does not handle it,<a name="line.1875"></a>
+<span class="sourceLineNo">1876</span>      // call abort() and add the procedure back in the queue for rollback.<a name="line.1876"></a>
+<span class="sourceLineNo">1877</span>      if (proc.setTimeoutFailure(getEnvironment())) {<a name="line.1877"></a>
+<span class="sourceLineNo">1878</span>        long rootProcId = Procedure.getRootProcedureId(procedures, proc);<a name="line.1878"></a>
+<span class="sourceLineNo">1879</span>        RootProcedureState procStack = rollbackStack.get(rootProcId);<a name="line.1879"></a>
+<span class="sourceLineNo">1880</span>        procStack.abort();<a name="line.1880"></a>
+<span class="sourceLineNo">1881</span>        store.update(proc);<a name="line.1881"></a>
+<span class="sourceLineNo">1882</span>        scheduler.addFront(proc);<a name="line.1882"></a>
+<span class="sourceLineNo">1883</span>      }<a name="line.1883"></a>
+<span class="sourceLineNo">1884</span>    }<a name="line.1884"></a>
+<span class="sourceLineNo">1885</span>  }<a name="line.1885"></a>
+<span class="sourceLineNo">1886</span><a name="line.1886"></a>
+<span class="sourceLineNo">1887</span>  private static final class DelayedProcedure<a name="line.1887"></a>
+<span class="sourceLineNo">1888</span>      extends DelayedUtil.DelayedContainerWithTimestamp&lt;Procedure&gt; {<a name="line.1888"></a>
+<span class="sourceLineNo">1889</span>    public DelayedProcedure(final Procedure procedure) {<a name="line.1889"></a>
+<span class="sourceLineNo">1890</span>      super(procedure, procedure.getTimeoutTimestamp());<a name="line.1890"></a>
+<span class="sourceLineNo">1891</span>    }<a name="line.1891"></a>
+<span class="sourceLineNo">1892</span>  }<a name="line.1892"></a>
+<span class="sourceLineNo">1893</span><a name="line.1893"></a>
+<span class="sourceLineNo">1894</span>  private static abstract class StoppableThread extends Thread {<a name="line.1894"></a>
+<span class="sourceLineNo">1895</span>    public StoppableThread(final ThreadGroup group, final String name) {<a name="line.1895"></a>
+<span class="sourceLineNo">1896</span>      super(group, name);<a name="line.1896"></a>
+<span class="sourceLineNo">1897</span>    }<a name="line.1897"></a>
 <span class="sourceLineNo">1898</span><a name="line.1898"></a>
-<span class="sourceLineNo">1899</span>    public void awaitTermination() {<a name="line.1899"></a>
-<span class="sourceLineNo">1900</span>      try {<a name="line.1900"></a>
-<span class="sourceLineNo">1901</span>        final long startTime = EnvironmentEdgeManager.currentTime();<a name="line.1901"></a>
-<span class="sourceLineNo">1902</span>        for (int i = 0; isAlive(); ++i) {<a name="line.1902"></a>
-<span class="sourceLineNo">1903</span>          sendStopSignal();<a name="line.1903"></a>
-<span class="sourceLineNo">1904</span>          join(250);<a name="line.1904"></a>
-<span class="sourceLineNo">1905</span>          if (i &gt; 0 &amp;&amp; (i % 8) == 0) {<a name="line.1905"></a>
-<span class="sourceLineNo">1906</span>            LOG.warn("Waiting termination of thread " + getName() + ", " +<a name="line.1906"></a>
-<span class="sourceLineNo">1907</span>              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));<a name="line.1907"></a>
-<span class="sourceLineNo">1908</span>          }<a name="line.1908"></a>
-<span class="sourceLineNo">1909</span>        }<a name="line.1909"></a>
-<span class="sourceLineNo">1910</span>      } catch (InterruptedException e) {<a name="line.1910"></a>
-<span class="sourceLineNo">1911</span>        LOG.warn(getName() + " join wait got interrupted", e);<a name="line.1911"></a>
-<span class="sourceLineNo">1912</span>      }<a name="line.1912"></a>
-<span class="sourceLineNo">1913</span>    }<a name="line.1913"></a>
-<span class="sourceLineNo">1914</span>  }<a name="line.1914"></a>
-<span class="sourceLineNo">1915</span><a name="line.1915"></a>
-<span class="sourceLineNo">1916</span>  // ==========================================================================<a name="line.1916"></a>
-<span class="sourceLineNo">1917</span>  //  Inline Chores (executors internal chores)<a name="line.1917"></a>
+<span class="sourceLineNo">1899</span>    public abstract void sendStopSignal();<a name="line.1899"></a>
+<span class="sourceLineNo">1900</span><a name="line.1900"></a>
+<span class="sourceLineNo">1901</span>    public void awaitTermination() {<a name="line.1901"></a>
+<span class="sourceLineNo">1902</span>      try {<a name="line.1902"></a>
+<span class="sourceLineNo">1903</span>        final long startTime = EnvironmentEdgeManager.currentTime();<a name="line.1903"></a>
+<span class="sourceLineNo">1904</span>        for (int i = 0; isAlive(); ++i) {<a name="line.1904"></a>
+<span class="sourceLineNo">1905</span>          sendStopSignal();<a name="line.1905"></a>
+<span class="sourceLineNo">1906</span>          join(250);<a name="line.1906"></a>
+<span class="sourceLineNo">1907</span>          if (i &gt; 0 &amp;&amp; (i % 8) == 0) {<a name="line.1907"></a>
+<span class="sourceLineNo">1908</span>            LOG.warn("Waiting termination of thread " + getName() + ", " +<a name="line.1908"></a>
+<span class="sourceLineNo">1909</span>              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));<a name="line.1909"></a>
+<span class="sourceLineNo">1910</span>          }<a name="line.1910"></a>
+<span class="sourceLineNo">1911</span>        }<a name="line.1911"></a>
+<span class="sourceLineNo">1912</span>      } catch (InterruptedException e) {<a name="line.1912"></a>
+<span class="sourceLineNo">1913</span>        LOG.warn(getName() + " join wait got interrupted", e);<a name="line.1913"></a>
+<span class="sourceLineNo">1914</span>      }<a name="line.1914"></a>
+<span class="sourceLineNo">1915</span>    }<a name="line.1915"></a>
+<span class="sourceLineNo">1916</span>  }<a name="line.1916"></a>
+<span class="sourceLineNo">1917</span><a name="line.1917"></a>
 <span class="sourceLineNo">1918</span>  // ==========================================================================<a name="line.1918"></a>
-<span class="sourceLineNo">1919</span>  private static abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable {<a name="line.1919"></a>
-<span class="sourceLineNo">1920</span>    private long timeout;<a name="line.1920"></a>
-<span class="sourceLineNo">1921</span><a name="line.1921"></a>
-<span class="sourceLineNo">1922</span>    public abstract int getTimeoutInterval();<a name="line.1922"></a>
+<span class="sourceLineNo">1919</span>  //  Inline Chores (executors internal chores)<a name="line.1919"></a>
+<span class="sourceLineNo">1920</span>  // ==========================================================================<a name="line.1920"></a>
+<span class="sourceLineNo">1921</span>  private static abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable {<a name="line.1921"></a>
+<span class="sourceLineNo">1922</span>    private long timeout;<a name="line.1922"></a>
 <span class="sourceLineNo">1923</span><a name="line.1923"></a>
-<span class="sourceLineNo">1924</span>    protected void refreshTimeout() {<a name="line.1924"></a>
-<span class="sourceLineNo">1925</span>      this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval();<a name="line.1925"></a>
-<span class="sourceLineNo">1926</span>    }<a name="line.1926"></a>
-<span class="sourceLineNo">1927</span><a name="line.1927"></a>
-<span class="sourceLineNo">1928</span>    @Override<a name="line.1928"></a>
-<span class="sourceLineNo">1929</span>    public long getTimeout() {<a name="line.1929"></a>
-<span class="sourceLineNo">1930</span>      return timeout;<a name="line.1930"></a>
-<span class="sourceLineNo">1931</span>    }<a name="line.1931"></a>
-<span class="sourceLineNo">1932</span>  }<a name="line.1932"></a>
-<span class="sourceLineNo">1933</span><a name="line.1933"></a>
-<span class="sourceLineNo">1934</span>  // ----------------------------------------------------------------------------<a name="line.1934"></a>
-<span class="sourceLineNo">1935</span>  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.1935"></a>
-<span class="sourceLineNo">1936</span>  // full set of procedures pending and completed to write a compacted<a name="line.1936"></a>
-<span class="sourceLineNo">1937</span>  // version of the log (in case is a log)?<a name="line.1937"></a>
-<span class="sourceLineNo">1938</span>  // In theory no, procedures are have a short life, so at some point the store<a name="line.1938"></a>
-<span class="sourceLineNo">1939</span>  // will have the tracker saying everything is in the last log.<a name="line.1939"></a>
-<span class="sourceLineNo">1940</span>  // ----------------------------------------------------------------------------<a name="line.1940"></a>
-<span class="sourceLineNo">1941</span><a name="line.1941"></a>
-<span class="sourceLineNo">1942</span>  private final class WorkerMonitor extends InlineChore {<a name="line.1942"></a>
-<span class="sourceLineNo">1943</span>    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.1943"></a>
-<span class="sourceLineNo">1944</span>        "hbase.procedure.worker.monitor.interval.msec";<a name="line.1944"></a>
-<span class="sourceLineNo">1945</span>    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.1945"></a>
-<span class="sourceLineNo">1946</span><a name="line.1946"></a>
-<span class="sourceLineNo">1947</span>    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.1947"></a>
-<span class="sourceLineNo">1948</span>        "hbase.procedure.worker.stuck.threshold.msec";<a name="line.1948"></a>
-<span class="sourceLineNo">1949</span>    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.1949"></a>
-<span class="sourceLineNo">1950</span><a name="line.1950"></a>
-<span class="sourceLineNo">1951</span>    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.1951"></a>
-<span class="sourceLineNo">1952</span>        "hbase.procedure.worker.add.stuck.percentage";<a name="line.1952"></a>
-<span class="sourceLineNo">1953</span>    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.1953"></a>
-<span class="sourceLineNo">1954</span><a name="line.1954"></a>
-<span class="sourceLineNo">1955</span>    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.1955"></a>
-<span class="sourceLineNo">1956</span>    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.1956"></a>
-<span class="sourceLineNo">1957</span>    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.1957"></a>
-<span class="sourceLineNo">1958</span><a name="line.1958"></a>
-<span class="sourceLineNo">1959</span>    public WorkerMonitor() {<a name="line.1959"></a>
-<span class="sourceLineNo">1960</span>      refreshConfig();<a name="line.1960"></a>
-<span class="sourceLineNo">1961</span>    }<a name="line.1961"></a>
-<span class="sourceLineNo">1962</span><a name="line.1962"></a>
-<span class="sourceLineNo">1963</span>    @Override<a name="line.1963"></a>
-<span class="sourceLineNo">1964</span>    public void run() {<a name="line.1964"></a>
-<span class="sourceLineNo">1965</span>      final int stuckCount = checkForStuckWorkers();<a name="line.1965"></a>
-<span class="sourceLineNo">1966</span>      checkThreadCount(stuckCount);<a name="line.1966"></a>
-<span class="sourceLineNo">1967</span><a name="line.1967"></a>
-<span class="sourceLineNo">1968</span>      // refresh interval (poor man dynamic conf update)<a name="line.1968"></a>
-<span class="sourceLineNo">1969</span>      refreshConfig();<a name="line.1969"></a>
-<span class="sourceLineNo">1970</span>    }<a name="line.1970"></a>
-<span class="sourceLineNo">1971</span><a name="line.1971"></a>
-<span class="sourceLineNo">1972</span>    private int checkForStuckWorkers() {<a name="line.1972"></a>
-<span class="sourceLineNo">1973</span>      // check if any of the worker is stuck<a name="line.1973"></a>
-<span class="sourceLineNo">1974</span>      int stuckCount = 0;<a name="line.1974"></a>
-<span class="sourceLineNo">1975</span>      for (WorkerThread worker: workerThreads) {<a name="line.1975"></a>
-<span class="sourceLineNo">1976</span>        if (worker.getCurrentRunTime() &lt; stuckThreshold) {<a name="line.1976"></a>
-<span class="sourceLineNo">1977</span>          continue;<a name="line.1977"></a>
-<span class="sourceLineNo">1978</span>        }<a name="line.1978"></a>
-<span class="sourceLineNo">1979</span><a name="line.1979"></a>
-<span class="sourceLineNo">1980</span>        // WARN the worker is stuck<a name="line.1980"></a>
-<span class="sourceLineNo">1981</span>        stuckCount++;<a name="line.1981"></a>
-<span class="sourceLineNo">1982</span>        LOG.warn("Worker stuck " + worker +<a name="line.1982"></a>
-<span class="sourceLineNo">1983</span>            " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.1983"></a>
-<span class="sourceLineNo">1984</span>      }<a name="line.1984"></a>
-<span class="sourceLineNo">1985</span>      return stuckCount;<a name="line.1985"></a>
-<span class="sourceLineNo">1986</span>    }<a name="line.1986"></a>
-<span class="sourceLineNo">1987</span><a name="line.1987"></a>
-<span class="sourceLineNo">1988</span>    private void checkThreadCount(final int stuckCount) {<a name="line.1988"></a>
-<span class="sourceLineNo">1989</span>      // nothing to do if there are no runnable tasks<a name="line.1989"></a>
-<span class="sourceLineNo">1990</span>      if (stuckCount &lt; 1 || !scheduler.hasRunnables()) return;<a name="line.1990"></a>
-<span class="sourceLineNo">1991</span><a name="line.1991"></a>
-<span class="sourceLineNo">1992</span>      // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.1992"></a>
-<span class="sourceLineNo">1993</span>      // and every handler is active.<a name="line.1993"></a>
-<span class="sourceLineNo">1994</span>      final float stuckPerc = ((float)stuckCount) / workerThreads.size();<a name="line.1994"></a>
-<span class="sourceLineNo">1995</span>      if (stuckPerc &gt;= addWorkerStuckPercentage &amp;&amp;<a name="line.1995"></a>
-<span class="sourceLineNo">1996</span>          activeExecutorCount.get() == workerThreads.size()) {<a name="line.1996"></a>
-<span class="sourceLineNo">1997</span>        final WorkerThread worker = new WorkerThread(threadGroup);<a name="line.1997"></a>
-<span class="sourceLineNo">1998</span>        workerThreads.add(worker);<a name="line.1998"></a>
-<span class="sourceLineNo">1999</span>        worker.start();<a name="line.1999"></a>
-<span class="sourceLineNo">2000</span>        LOG.debug("Added new worker thread " + worker);<a name="line.2000"></a>
-<span class="sourceLineNo">2001</span>      }<a name="line.2001"></a>
-<span class="sourceLineNo">2002</span>    }<a name="line.2002"></a>
-<span class="sourceLineNo">2003</span><a name="line.2003"></a>
-<span class="sourceLineNo">2004</span>    private void refreshConfig() {<a name="line.2004"></a>
-<span class="sourceLineNo">2005</span>      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2005"></a>
-<span class="sourceLineNo">2006</span>          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2006"></a>
-<span class="sourceLineNo">2007</span>      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2007"></a>
-<span class="sourceLineNo">2008</span>        DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2008"></a>
-<span class="sourceLineNo">2009</span>      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2009"></a>
-<span class="sourceLineNo">2010</span>        DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2010"></a>
-<span class="sourceLineNo">2011</span>    }<a name="line.2011"></a>
-<span class="sourceLineNo">2012</span><a name="line.2012"></a>
-<span class="sourceLineNo">2013</span>    @Override<a name="line.2013"></a>
-<span class="sourceLineNo">2014</span>    public int getTimeoutInterval() {<a name="line.2014"></a>
-<span class="sourceLineNo">2015</span>      return timeoutInterval;<a name="line.2015"></a>
-<span class="sourceLineNo">2016</span>    }<a name="line.2016"></a>
-<span class="sourceLineNo">2017</span>  }<a name="line.2017"></a>
-<span class="sourceLineNo">2018</span>}<a name="line.2018"></a>
+<span class="sourceLineNo">1924</span>    public abstract int getTimeoutInterval();<a name="line.1924"></a>
+<span class="sourceLineNo">1925</span><a name="line.1925"></a>
+<span class="sourceLineNo">1926</span>    protected void refreshTimeout() {<a name="line.1926"></a>
+<span class="sourceLineNo">1927</span>      this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval();<a name="line.1927"></a>
+<span class="sourceLineNo">1928</span>    }<a name="line.1928"></a>
+<span class="sourceLineNo">1929</span><a name="line.1929"></a>
+<span class="sourceLineNo">1930</span>    @Override<a name="line.1930"></a>
+<span class="sourceLineNo">1931</span>    public long getTimeout() {<a name="line.1931"></a>
+<span class="sourceLineNo">1932</span>      return timeout;<a name="line.1932"></a>
+<span class="sourceLineNo">1933</span>    }<a name="line.1933"></a>
+<span class="sourceLineNo">1934</span>  }<a name="line.1934"></a>
+<span class="sourceLineNo">1935</span><a name="line.1935"></a>
+<span class="sourceLineNo">1936</span>  // ----------------------------------------------------------------------------<a name="line.1936"></a>
+<span class="sourceLineNo">1937</span>  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.1937"></a>
+<span class="sourceLineNo">1938</span>  // full set of procedures pending and completed to write a compacted<a name="line.1938"></a>
+<span class="sourceLineNo">1939</span>  // version of the log (in case is a log)?<a name="line.1939"></a>
+<span class="sourceLineNo">1940</span>  // In theory no, procedures are have a short life, so at some point the store<a name="line.1940"></a>
+<span class="sourceLineNo">1941</span>  // will have the tracker saying everything is in the last log.<a name="line.1941"></a>
+<span class="sourceLineNo">1942</span>  // ----------------------------------------------------------------------------<a name="line.1942"></a>
+<span class="sourceLineNo">1943</span><a name="line.1943"></a>
+<span class="sourceLineNo">1944</span>  private final class WorkerMonitor extends InlineChore {<a name="line.1944"></a>
+<span class="sourceLineNo">1945</span>    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.1945"></a>
+<span class="sourceLineNo">1946</span>        "hbase.procedure.worker.monitor.interval.msec";<a name="line.1946"></a>
+<span class="sourceLineNo">1947</span>    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.1947"></a>
+<span class="sourceLineNo">1948</span><a name="line.1948"></a>
+<span class="sourceLineNo">1949</span>    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.1949"></a>
+<span class="sourceLineNo">1950</span>        "hbase.procedure.worker.stuck.threshold.msec";<a name="line.1950"></a>
+<span class="sourceLineNo">1951</span>    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.1951"></a>
+<span class="sourceLineNo">1952</span><a name="line.1952"></a>
+<span class="sourceLineNo">1953</span>    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.1953"></a>
+<span class="sourceLineNo">1954</span>        "hbase.procedure.worker.add.stuck.percentage";<a name="line.1954"></a>
+<span class="sourceLineNo">1955</span>    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.1955"></a>
+<span class="sourceLineNo">1956</span><a name="line.1956"></a>
+<span class="sourceLineNo">1957</span>    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.1957"></a>
+<span class="sourceLineNo">1958</span>    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.1958"></a>
+<span class="sourceLineNo">1959</span>    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.1959"></a>
+<span class="sourceLineNo">1960</span><a name="line.1960"></a>
+<span class="sourceLineNo">1961</span>    public WorkerMonitor() {<a name="line.1961"></a>
+<span class="sourceLineNo">1962</span>      refreshConfig();<a name="line.1962"></a>
+<span class="sourceLineNo">1963</span>    }<a name="line.1963"></a>
+<span class="sourceLineNo">1964</span><a name="line.1964"></a>
+<span class="sourceLineNo">1965</span>    @Override<a name="line.1965"></a>
+<span class="sourceLineNo">1966</span>    public void run() {<a name="line.1966"></a>
+<span class="sourceLineNo">1967</span>      final int stuckCount = checkForStuckWorkers();<a name="line.1967"></a>
+<span class="sourceLineNo">1968</span>      checkThreadCount(stuckCount);<a name="line.1968"></a>
+<span class="sourceLineNo">1969</span><a name="line.1969"></a>
+<span class="sourceLineNo">1970</span>      // refresh interval (poor man dynamic conf update)<a name="line.1970"></a>
+<span class="sourceLineNo">1971</span>      refreshConfig();<a name="line.1971"></a>
+<span class="sourceLineNo">1972</span>    }<a name="line.1972"></a>
+<span class="sourceLineNo">1973</span><a name="line.1973"></a>
+<span class="sourceLineNo">1974</span>    private int checkForStuckWorkers() {<a name="line.1974"></a>
+<span class="sourceLineNo">1975</span>      // check if any of the worker is stuck<a name="line.1975"></a>
+<span class="sourceLineNo">1976</span>      int stuckCount = 0;<a name="line.1976"></a>
+<span class="sourceLineNo">1977</span>      for (WorkerThread worker: workerThreads) {<a name="line.1977"></a>
+<span class="sourceLineNo">1978</span>        if (worker.getCurrentRunTime() &lt; stuckThreshold) {<a name="line.1978"></a>
+<span class="sourceLineNo">1979</span>          continue;<a name="line.1979"></a>
+<span class="sourceLineNo">1980</span>        }<a name="line.1980"></a>
+<span class="sourceLineNo">1981</span><a name="line.1981"></a>
+<span class="sourceLineNo">1982</span>        // WARN the worker is stuck<a name="line.1982"></a>
+<span class="sourceLineNo">1983</span>        stuckCount++;<a name="line.1983"></a>
+<span class="sourceLineNo">1984</span>        LOG.warn("Worker stuck " + worker +<a name="line.1984"></a>
+<span class="sourceLineNo">1985</span>            " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.1985"></a>
+<span class="sourceLineNo">1986</span>      }<a name="line.1986"></a>
+<span class="sourceLineNo">1987</span>      return stuckCount;<a name="line.1987"></a>
+<span class="sourceLineNo">1988</span>    }<a name="line.1988"></a>
+<span class="sourceLineNo">1989</span><a name="line.1989"></a>
+<span class="sourceLineNo">1990</span>    private void checkThreadCount(final int stuckCount) {<a name="line.1990"></a>
+<span class="sourceLineNo">1991</span>      // nothing to do if there are no runnable tasks<a name="line.1991"></a>
+<span class="sourceLineNo">1992</span>      if (stuckCount &lt; 1 || !scheduler.hasRunnables()) return;<a name="line.1992"></a>
+<span class="sourceLineNo">1993</span><a name="line.1993"></a>
+<span class="sourceLineNo">1994</span>      // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.1994"></a>
+<span class="sourceLineNo">1995</span>      // and every handler is active.<a name="line.1995"></a>
+<span class="sourceLineNo">1996</span>      final float stuckPerc = ((float)stuckCount) / workerThreads.size();<a name="line.1996"></a>
+<span class="sourceLineNo">1997</span>      if (stuckPerc &gt;= addWorkerStuckPercentage &amp;&amp;<a name="line.1997"></a>
+<span class="sourceLineNo">1998</span>          activeExecutorCount.get() == workerThreads.size()) {<a name="line.1998"></a>
+<span class="sourceLineNo">1999</span>        final WorkerThread worker = new WorkerThread(threadGroup);<a name="line.1999"></a>
+<span class="sourceLineNo">2000</span>        workerThreads.add(worker);<a name="line.2000"></a>
+<span class="sourceLineNo">2001</span>        worker.start();<a name="line.2001"></a>
+<span class="sourceLineNo">2002</span>        LOG.debug("Added new worker thread " + worker);<a name="line.2002"></a>
+<span class="sourceLineNo">2003</span>      }<a name="line.2003"></a>
+<span class="sourceLineNo">2004</span>    }<a name="line.2004"></a>
+<span class="sourceLineNo">2005</span><a name="line.2005"></a>
+<span class="sourceLineNo">2006</span>    private void refreshConfig() {<a name="line.2006"></a>
+<span class="sourceLineNo">2007</span>      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2007"></a>
+<span class="sourceLineNo">2008</span>          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2008"></a>
+<span class="sourceLineNo">2009</span>      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2009"></a>
+<span class="sourceLineNo">2010</span>        DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2010"></a>
+<span class="sourceLineNo">2011</span>      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2011"></a>
+<span class="sourceLineNo">2012</span>        DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2012"></a>
+<span class="sourceLineNo">2013</span>    }<a name="line.2013"></a>
+<span class="sourceLineNo">2014</span><a name="line.2014"></a>
+<span class="sourceLineNo">2015</span>    @Override<a name="line.2015"></a>
+<span class="sourceLineNo">2016</span>    public int getTimeoutInterval() {<a name="line.2016"></a>
+<span class="sourceLineNo">2017</span>      return timeoutInterval;<a name="line.2017"></a>
+<span class="sourceLineNo">2018</span>    }<a name="line.2018"></a>
+<span class="sourceLineNo">2019</span>  }<a name="line.2019"></a>
+<span class="sourceLineNo">2020</span>}<a name="line.2020"></a>
 
 
 


Mime
View raw message