Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6894D200C8F for ; Thu, 25 May 2017 16:59:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 671D3160BE0; Thu, 25 May 2017 14:59:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0D1EB160BC7 for ; Thu, 25 May 2017 16:59:37 +0200 (CEST) Received: (qmail 98392 invoked by uid 500); 25 May 2017 14:59:36 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 96698 invoked by uid 99); 25 May 2017 14:59:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 May 2017 14:59:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 040D0F219F; Thu, 25 May 2017 14:59:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 25 May 2017 14:59:46 -0000 Message-Id: In-Reply-To: <8d5120da9fc94a3bba89b4778cc295d5@git.apache.org> References: <8d5120da9fc94a3bba89b4778cc295d5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/37] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Thu, 25 May 2017 14:59:40 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.ObservedExceptionsInBatch.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.ObservedExceptionsInBatch.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.ObservedExceptionsInBatch.html index fe28fe2..86378be 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.ObservedExceptionsInBatch.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.ObservedExceptionsInBatch.html @@ -1765,6408 +1765,6439 @@ 1757 } 1758 } 1759 -1760 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( -1761 final String threadNamePrefix) { -1762 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); -1763 int maxThreads = Math.min(numStores, -1764 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, -1765 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)); -1766 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); -1767 } -1768 -1769 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool( -1770 final String threadNamePrefix) { -1771 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); -1772 int maxThreads = Math.max(1, -1773 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, -1774 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX) -1775 / numStores); -1776 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); -1777 } -1778 -1779 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, -1780 final String threadNamePrefix) { -1781 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, -1782 new ThreadFactory() { -1783 private int count = 1; -1784 -1785 @Override -1786 public Thread newThread(Runnable r) { -1787 return new Thread(r, threadNamePrefix + "-" + count++); -1788 } -1789 }); +1760 @Override +1761 public void waitForFlushes() { +1762 synchronized (writestate) { +1763 if (this.writestate.readOnly) { +1764 // we should not wait for replayed flushed if we are read only (for example in case the +1765 // region is a secondary replica). +1766 return; +1767 } +1768 if (!writestate.flushing) return; +1769 long start = System.currentTimeMillis(); +1770 boolean interrupted = false; +1771 try { +1772 while (writestate.flushing) { +1773 LOG.debug("waiting for cache flush to complete for region " + this); +1774 try { +1775 writestate.wait(); +1776 } catch (InterruptedException iex) { +1777 // essentially ignore and propagate the interrupt back up +1778 LOG.warn("Interrupted while waiting"); +1779 interrupted = true; +1780 } +1781 } +1782 } finally { +1783 if (interrupted) { +1784 Thread.currentThread().interrupt(); +1785 } +1786 } +1787 long duration = System.currentTimeMillis() - start; +1788 LOG.debug("Waited " + duration + " ms for flush to complete"); +1789 } 1790 } -1791 -1792 /** -1793 * @return True if its worth doing a flush before we put up the close flag. -1794 */ -1795 private boolean worthPreFlushing() { -1796 return this.memstoreDataSize.get() > -1797 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5); +1791 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( +1792 final String threadNamePrefix) { +1793 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); +1794 int maxThreads = Math.min(numStores, +1795 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, +1796 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)); +1797 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); 1798 } 1799 -1800 ////////////////////////////////////////////////////////////////////////////// -1801 // HRegion accessors -1802 ////////////////////////////////////////////////////////////////////////////// -1803 -1804 @Override -1805 public HTableDescriptor getTableDesc() { -1806 return this.htableDescriptor; -1807 } -1808 -1809 /** @return WAL in use for this region */ -1810 public WAL getWAL() { -1811 return this.wal; -1812 } -1813 -1814 /** -1815 * @return split policy for this region. -1816 */ -1817 public RegionSplitPolicy getSplitPolicy() { -1818 return this.splitPolicy; -1819 } -1820 -1821 /** -1822 * A split takes the config from the parent region & passes it to the daughter -1823 * region's constructor. If 'conf' was passed, you would end up using the HTD -1824 * of the parent region in addition to the new daughter HTD. Pass 'baseConf' -1825 * to the daughter regions to avoid this tricky dedupe problem. -1826 * @return Configuration object -1827 */ -1828 Configuration getBaseConf() { -1829 return this.baseConf; -1830 } -1831 -1832 /** @return {@link FileSystem} being used by this region */ -1833 public FileSystem getFilesystem() { -1834 return fs.getFileSystem(); -1835 } -1836 -1837 /** @return the {@link HRegionFileSystem} used by this region */ -1838 public HRegionFileSystem getRegionFileSystem() { -1839 return this.fs; -1840 } -1841 -1842 @Override -1843 public long getEarliestFlushTimeForAllStores() { -1844 return Collections.min(lastStoreFlushTimeMap.values()); -1845 } -1846 -1847 @Override -1848 public long getOldestHfileTs(boolean majorCompactionOnly) throws IOException { -1849 long result = Long.MAX_VALUE; -1850 for (Store store : getStores()) { -1851 Collection<StoreFile> storeFiles = store.getStorefiles(); -1852 if (storeFiles == null) continue; -1853 for (StoreFile file : storeFiles) { -1854 StoreFileReader sfReader = file.getReader(); -1855 if (sfReader == null) continue; -1856 HFile.Reader reader = sfReader.getHFileReader(); -1857 if (reader == null) continue; -1858 if (majorCompactionOnly) { -1859 byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY); -1860 if (val == null || !Bytes.toBoolean(val)) continue; -1861 } -1862 result = Math.min(result, reader.getFileContext().getFileCreateTime()); -1863 } -1864 } -1865 return result == Long.MAX_VALUE ? 0 : result; +1800 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool( +1801 final String threadNamePrefix) { +1802 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); +1803 int maxThreads = Math.max(1, +1804 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, +1805 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX) +1806 / numStores); +1807 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); +1808 } +1809 +1810 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, +1811 final String threadNamePrefix) { +1812 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, +1813 new ThreadFactory() { +1814 private int count = 1; +1815 +1816 @Override +1817 public Thread newThread(Runnable r) { +1818 return new Thread(r, threadNamePrefix + "-" + count++); +1819 } +1820 }); +1821 } +1822 +1823 /** +1824 * @return True if its worth doing a flush before we put up the close flag. +1825 */ +1826 private boolean worthPreFlushing() { +1827 return this.memstoreDataSize.get() > +1828 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5); +1829 } +1830 +1831 ////////////////////////////////////////////////////////////////////////////// +1832 // HRegion accessors +1833 ////////////////////////////////////////////////////////////////////////////// +1834 +1835 @Override +1836 public HTableDescriptor getTableDesc() { +1837 return this.htableDescriptor; +1838 } +1839 +1840 /** @return WAL in use for this region */ +1841 public WAL getWAL() { +1842 return this.wal; +1843 } +1844 +1845 /** +1846 * @return split policy for this region. +1847 */ +1848 public RegionSplitPolicy getSplitPolicy() { +1849 return this.splitPolicy; +1850 } +1851 +1852 /** +1853 * A split takes the config from the parent region & passes it to the daughter +1854 * region's constructor. If 'conf' was passed, you would end up using the HTD +1855 * of the parent region in addition to the new daughter HTD. Pass 'baseConf' +1856 * to the daughter regions to avoid this tricky dedupe problem. +1857 * @return Configuration object +1858 */ +1859 Configuration getBaseConf() { +1860 return this.baseConf; +1861 } +1862 +1863 /** @return {@link FileSystem} being used by this region */ +1864 public FileSystem getFilesystem() { +1865 return fs.getFileSystem(); 1866 } 1867 -1868 RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) { -1869 long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId; -1870 byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes(); -1871 regionLoadBldr.clearStoreCompleteSequenceId(); -1872 for (byte[] familyName : this.stores.keySet()) { -1873 long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName); -1874 // Subtract - 1 to go earlier than the current oldest, unflushed edit in memstore; this will -1875 // give us a sequence id that is for sure flushed. We want edit replay to start after this -1876 // sequence id in this region. If NO_SEQNUM, use the regions maximum flush id. -1877 long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1; -1878 regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.newBuilder() -1879 .setFamilyName(UnsafeByteOperations.unsafeWrap(familyName)).setSequenceId(csid).build()); -1880 } -1881 return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId()); -1882 } -1883 -1884 ////////////////////////////////////////////////////////////////////////////// -1885 // HRegion maintenance. -1886 // -1887 // These methods are meant to be called periodically by the HRegionServer for -1888 // upkeep. -1889 ////////////////////////////////////////////////////////////////////////////// -1890 -1891 /** @return returns size of largest HStore. */ -1892 public long getLargestHStoreSize() { -1893 long size = 0; -1894 for (Store h : stores.values()) { -1895 long storeSize = h.getSize(); -1896 if (storeSize > size) { -1897 size = storeSize; -1898 } -1899 } -1900 return size; -1901 } -1902 -1903 /* -1904 * Do preparation for pending compaction. -1905 * @throws IOException -1906 */ -1907 protected void doRegionCompactionPrep() throws IOException { -1908 } -1909 -1910 @Override -1911 public void triggerMajorCompaction() throws IOException { -1912 for (Store s : getStores()) { -1913 s.triggerMajorCompaction(); -1914 } -1915 } -1916 -1917 @Override -1918 public void compact(final boolean majorCompaction) throws IOException { -1919 if (majorCompaction) { -1920 triggerMajorCompaction(); -1921 } -1922 for (Store s : getStores()) { -1923 CompactionContext compaction = s.requestCompaction(); -1924 if (compaction != null) { -1925 ThroughputController controller = null; -1926 if (rsServices != null) { -1927 controller = CompactionThroughputControllerFactory.create(rsServices, conf); -1928 } -1929 if (controller == null) { -1930 controller = NoLimitThroughputController.INSTANCE; -1931 } -1932 compact(compaction, s, controller, null); -1933 } -1934 } -1935 } -1936 -1937 /** -1938 * This is a helper function that compact all the stores synchronously -1939 * It is used by utilities and testing -1940 * -1941 * @throws IOException e -1942 */ -1943 public void compactStores() throws IOException { -1944 for (Store s : getStores()) { -1945 CompactionContext compaction = s.requestCompaction(); -1946 if (compaction != null) { -1947 compact(compaction, s, NoLimitThroughputController.INSTANCE, null); -1948 } -1949 } -1950 } -1951 -1952 /** -1953 * This is a helper function that compact the given store -1954 * It is used by utilities and testing -1955 * -1956 * @throws IOException e -1957 */ -1958 @VisibleForTesting -1959 void compactStore(byte[] family, ThroughputController throughputController) -1960 throws IOException { -1961 Store s = getStore(family); -1962 CompactionContext compaction = s.requestCompaction(); -1963 if (compaction != null) { -1964 compact(compaction, s, throughputController, null); +1868 /** @return the {@link HRegionFileSystem} used by this region */ +1869 public HRegionFileSystem getRegionFileSystem() { +1870 return this.fs; +1871 } +1872 +1873 @Override +1874 public long getEarliestFlushTimeForAllStores() { +1875 return Collections.min(lastStoreFlushTimeMap.values()); +1876 } +1877 +1878 @Override +1879 public long getOldestHfileTs(boolean majorCompactionOnly) throws IOException { +1880 long result = Long.MAX_VALUE; +1881 for (Store store : getStores()) { +1882 Collection<StoreFile> storeFiles = store.getStorefiles(); +1883 if (storeFiles == null) continue; +1884 for (StoreFile file : storeFiles) { +1885 StoreFileReader sfReader = file.getReader(); +1886 if (sfReader == null) continue; +1887 HFile.Reader reader = sfReader.getHFileReader(); +1888 if (reader == null) continue; +1889 if (majorCompactionOnly) { +1890 byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY); +1891 if (val == null || !Bytes.toBoolean(val)) continue; +1892 } +1893 result = Math.min(result, reader.getFileContext().getFileCreateTime()); +1894 } +1895 } +1896 return result == Long.MAX_VALUE ? 0 : result; +1897 } +1898 +1899 RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) { +1900 long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId; +1901 byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes(); +1902 regionLoadBldr.clearStoreCompleteSequenceId(); +1903 for (byte[] familyName : this.stores.keySet()) { +1904 long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName); +1905 // Subtract - 1 to go earlier than the current oldest, unflushed edit in memstore; this will +1906 // give us a sequence id that is for sure flushed. We want edit replay to start after this +1907 // sequence id in this region. If NO_SEQNUM, use the regions maximum flush id. +1908 long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1; +1909 regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.newBuilder() +1910 .setFamilyName(UnsafeByteOperations.unsafeWrap(familyName)).setSequenceId(csid).build()); +1911 } +1912 return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId()); +1913 } +1914 +1915 ////////////////////////////////////////////////////////////////////////////// +1916 // HRegion maintenance. +1917 // +1918 // These methods are meant to be called periodically by the HRegionServer for +1919 // upkeep. +1920 ////////////////////////////////////////////////////////////////////////////// +1921 +1922 /** @return returns size of largest HStore. */ +1923 public long getLargestHStoreSize() { +1924 long size = 0; +1925 for (Store h : stores.values()) { +1926 long storeSize = h.getSize(); +1927 if (storeSize > size) { +1928 size = storeSize; +1929 } +1930 } +1931 return size; +1932 } +1933 +1934 /* +1935 * Do preparation for pending compaction. +1936 * @throws IOException +1937 */ +1938 protected void doRegionCompactionPrep() throws IOException { +1939 } +1940 +1941 @Override +1942 public void triggerMajorCompaction() throws IOException { +1943 for (Store s : getStores()) { +1944 s.triggerMajorCompaction(); +1945 } +1946 } +1947 +1948 @Override +1949 public void compact(final boolean majorCompaction) throws IOException { +1950 if (majorCompaction) { +1951 triggerMajorCompaction(); +1952 } +1953 for (Store s : getStores()) { +1954 CompactionContext compaction = s.requestCompaction(); +1955 if (compaction != null) { +1956 ThroughputController controller = null; +1957 if (rsServices != null) { +1958 controller = CompactionThroughputControllerFactory.create(rsServices, conf); +1959 } +1960 if (controller == null) { +1961 controller = NoLimitThroughputController.INSTANCE; +1962 } +1963 compact(compaction, s, controller, null); +1964 } 1965 } 1966 } 1967 -1968 /* -1969 * Called by compaction thread and after region is opened to compact the -1970 * HStores if necessary. +1968 /** +1969 * This is a helper function that compact all the stores synchronously +1970 * It is used by utilities and testing 1971 * -1972 * <p>This operation could block for a long time, so don't call it from a -1973 * time-sensitive thread. -1974 * -1975 * Note that no locking is necessary at this level because compaction only -1976 * conflicts with a region split, and that cannot happen because the region -1977 * server does them sequentially and not in parallel. -1978 * -1979 * @param compaction Compaction details, obtained by requestCompaction() -1980 * @param throughputController -1981 * @return whether the compaction completed -1982 */ -1983 public boolean compact(CompactionContext compaction, Store store, -1984 ThroughputController throughputController) throws IOException { -1985 return compact(compaction, store, throughputController, null); -1986 } -1987 -1988 public boolean compact(CompactionContext compaction, Store store, -1989 ThroughputController throughputController, User user) throws IOException { -1990 assert compaction != null && compaction.hasSelection(); -1991 assert !compaction.getRequest().getFiles().isEmpty(); -1992 if (this.closing.get() || this.closed.get()) { -1993 LOG.debug("Skipping compaction on " + this + " because closing/closed"); -1994 store.cancelRequestedCompaction(compaction); -1995 return false; +1972 * @throws IOException e +1973 */ +1974 public void compactStores() throws IOException { +1975 for (Store s : getStores()) { +1976 CompactionContext compaction = s.requestCompaction(); +1977 if (compaction != null) { +1978 compact(compaction, s, NoLimitThroughputController.INSTANCE, null); +1979 } +1980 } +1981 } +1982 +1983 /** +1984 * This is a helper function that compact the given store +1985 * It is used by utilities and testing +1986 * +1987 * @throws IOException e +1988 */ +1989 @VisibleForTesting +1990 void compactStore(byte[] family, ThroughputController throughputController) +1991 throws IOException { +1992 Store s = getStore(family); +1993 CompactionContext compaction = s.requestCompaction(); +1994 if (compaction != null) { +1995 compact(compaction, s, throughputController, null); 1996 } -1997 MonitoredTask status = null; -1998 boolean requestNeedsCancellation = true; -1999 /* -2000 * We are trying to remove / relax the region read lock for compaction. -2001 * Let's see what are the potential race conditions among the operations (user scan, -2002 * region split, region close and region bulk load). -2003 * -2004 * user scan ---> region read lock -2005 * region split --> region close first --> region write lock -2006 * region close --> region write lock -2007 * region bulk load --> region write lock -2008 * -2009 * read lock is compatible with read lock. ---> no problem with user scan/read -2010 * region bulk load does not cause problem for compaction (no consistency problem, store lock -2011 * will help the store file accounting). -2012 * They can run almost concurrently at the region level. -2013 * -2014 * The only remaining race condition is between the region close and compaction. -2015 * So we will evaluate, below, how region close intervenes with compaction if compaction does -2016 * not acquire region read lock. -2017 * -2018 * Here are the steps for compaction: -2019 * 1. obtain list of StoreFile's -2020 * 2. create StoreFileScanner's based on list from #1 -2021 * 3. perform compaction and save resulting files under tmp dir -2022 * 4. swap in compacted files -2023 * -2024 * #1 is guarded by store lock. This patch does not change this --> no worse or better -2025 * For #2, we obtain smallest read point (for region) across all the Scanners (for both default -2026 * compactor and stripe compactor). -2027 * The read points are for user scans. Region keeps the read points for all currently open -2028 * user scanners. -2029 * Compaction needs to know the smallest read point so that during re-write of the hfiles, -2030 * it can remove the mvcc points for the cells if their mvccs are older than the smallest -2031 * since they are not needed anymore. -2032 * This will not conflict with compaction. -2033 * For #3, it can be performed in parallel to other operations. -2034 * For #4 bulk load and compaction don't conflict with each other on the region level -2035 * (for multi-family atomicy). -2036 * Region close and compaction are guarded pretty well by the 'writestate'. -2037 * In HRegion#doClose(), we have : -2038 * synchronized (writestate) { -2039 * // Disable compacting and flushing by background threads for this -2040 * // region. -2041 * canFlush = !writestate.readOnly; -2042 * writestate.writesEnabled = false; -2043 * LOG.debug("Closing " + this + ": disabling compactions & flushes"); -2044 * waitForFlushesAndCompactions(); -2045 * } -2046 * waitForFlushesAndCompactions() would wait for writestate.compacting to come down to 0. -2047 * and in HRegion.compact() -2048 * try { -2049 * synchronized (writestate) { -2050 * if (writestate.writesEnabled) { -2051 * wasStateSet = true; -2052 * ++writestate.compacting; -2053 * } else { -2054 * String msg = "NOT compacting region " + this + ". Writes disabled."; -2055 * LOG.info(msg); -2056 * status.abort(msg); -2057 * return false; -2058 * } -2059 * } -2060 * Also in compactor.performCompaction(): -2061 * check periodically to see if a system stop is requested -2062 * if (closeCheckInterval > 0) { -2063 * bytesWritten += len; -2064 * if (bytesWritten > closeCheckInterval) { -2065 * bytesWritten = 0; -2066 * if (!store.areWritesEnabled()) { -2067 * progress.cancel(); -2068 * return false; -2069 * } -2070 * } -2071 * } -2072 */ -2073 try { -2074 byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); -2075 if (stores.get(cf) != store) { -2076 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this -2077 + " has been re-instantiated, cancel this compaction request. " -2078 + " It may be caused by the roll back of split transaction"); -2079 return false; -2080 } -2081 -2082 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); -2083 if (this.closed.get()) { -2084 String msg = "Skipping compaction on " + this + " because closed"; -2085 LOG.debug(msg); -2086 status.abort(msg); -2087 return false; -2088 } -2089 boolean wasStateSet = false; -2090 try { -2091 synchronized (writestate) { -2092 if (writestate.writesEnabled) { -2093 wasStateSet = true; -2094 writestate.compacting.incrementAndGet(); -2095 } else { -2096 String msg = "NOT compacting region " + this + ". Writes disabled."; -2097 LOG.info(msg); -2098 status.abort(msg); -2099 return false; -2100 } -2101 } -2102 LOG.info("Starting compaction on " + store + " in region " + this -2103 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":"")); -2104 doRegionCompactionPrep(); -2105 try { -2106 status.setStatus("Compacting store " + store); -2107 // We no longer need to cancel the request on the way out of this -2108 // method because Store#compact will clean up unconditionally -2109 requestNeedsCancellation = false; -2110 store.compact(compaction, throughputController, user); -2111 } catch (InterruptedIOException iioe) { -2112 String msg = "compaction interrupted"; -2113 LOG.info(msg, iioe); -2114 status.abort(msg); -2115 return false; -2116 } -2117 } finally { -2118 if (wasStateSet) { -2119 synchronized (writestate) { -2120 writestate.compacting.decrementAndGet(); -2121 if (writestate.compacting.get() <= 0) { -2122 writestate.notifyAll(); -2123 } -2124 } -2125 } -2126 } -2127 status.markComplete("Compaction complete"); -2128 return true; -2129 } finally { -2130 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); -2131 if (status != null) status.cleanup(); -2132 } -2133 } -2134 -2135 @Override -2136 public FlushResult flush(boolean force) throws IOException { -2137 return flushcache(force, false); -2138 } -2139 -2140 /** -2141 * Flush the cache. -2142 * -2143 * When this method is called the cache will be flushed unless: -2144 * <ol> -2145 * <li>the cache is empty</li> -2146 * <li>the region is closed.</li> -2147 * <li>a flush is already in progress</li> -2148 * <li>writes are disabled</li> -2149 * </ol> -2150 * -2151 * <p>This method may block for some time, so it should not be called from a -2152 * time-sensitive thread. -2153 * @param forceFlushAllStores whether we want to flush all stores -2154 * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL -2155 * @return whether the flush is success and whether the region needs compacting -2156 * -2157 * @throws IOException general io exceptions -2158 * @throws DroppedSnapshotException Thrown when replay of wal is required -2159 * because a Snapshot was not properly persisted. The region is put in closing mode, and the -2160 * caller MUST abort after this. -2161 */ -2162 public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) -2163 throws IOException { -2164 // fail-fast instead of waiting on the lock -2165 if (this.closing.get()) { -2166 String msg = "Skipping flush on " + this + " because closing"; -2167 LOG.debug(msg); -2168 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); -2169 } -2170 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); -2171 status.setStatus("Acquiring readlock on region"); -2172 // block waiting for the lock for flushing cache -2173 lock.readLock().lock(); -2174 try { -2175 if (this.closed.get()) { -2176 String msg = "Skipping flush on " + this + " because closed"; -2177 LOG.debug(msg); -2178 status.abort(msg); -2179 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); -2180 } -2181 if (coprocessorHost != null) { -2182 status.setStatus("Running coprocessor pre-flush hooks"); -2183 coprocessorHost.preFlush(); -2184 } -2185 // TODO: this should be managed within memstore with the snapshot, updated only after flush -2186 // successful -2187 if (numMutationsWithoutWAL.sum() > 0) { -2188 numMutationsWithoutWAL.reset(); -2189 dataInMemoryWithoutWAL.reset(); -2190 } -2191 synchronized (writestate) { -2192 if (!writestate.flushing && writestate.writesEnabled) { -2193 this.writestate.flushing = true; -2194 } else { -2195 if (LOG.isDebugEnabled()) { -2196 LOG.debug("NOT flushing memstore for region " + this -2197 + ", flushing=" + writestate.flushing + ", writesEnabled=" -2198 + writestate.writesEnabled); -2199 } -2200 String msg = "Not flushing since " -2201 + (writestate.flushing ? "already flushing" -2202 : "writes not enabled"); -2203 status.abort(msg); -2204 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); -2205 } -2206 } -2207 -2208 try { -2209 Collection<Store> specificStoresToFlush = -2210 forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); -2211 FlushResult fs = internalFlushcache(specificStoresToFlush, -2212 status, writeFlushRequestWalMarker); -2213 -2214 if (coprocessorHost != null) { -2215 status.setStatus("Running post-flush coprocessor hooks"); -2216 coprocessorHost.postFlush(); -2217 } -2218 -2219 status.markComplete("Flush successful"); -2220 return fs; -2221 } finally { -2222 synchronized (writestate) { -2223 writestate.flushing = false; -2224 this.writestate.flushRequested = false; -2225 writestate.notifyAll(); -2226 } -2227 } -2228 } finally { -2229 lock.readLock().unlock(); -2230 status.cleanup(); -2231 } -2232 } -2233 -2234 /** -2235 * Should the store be flushed because it is old enough. -2236 * <p> -2237 * Every FlushPolicy should call this to determine whether a store is old enough to flush (except -2238 * that you always flush all stores). Otherwise the method will always -2239 * returns true which will make a lot of flush requests. -2240 */ -2241 boolean shouldFlushStore(Store store) { -2242 long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), -2243 store.getFamily().getName()) - 1; -2244 if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) { -2245 if (LOG.isDebugEnabled()) { -2246 LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " + -2247 getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest + -2248 " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint()); -2249 } -2250 return true; -2251 } -2252 if (this.flushCheckInterval <= 0) { -2253 return false; -2254 } -2255 long now = EnvironmentEdgeManager.currentTime(); -2256 if (store.timeOfOldestEdit() < now - this.flushCheckInterval) { -2257 if (LOG.isDebugEnabled()) { -2258 LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " + -2259 getRegionInfo().getEncodedName() + " because time of oldest edit=" + -2260 store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now); -2261 } -2262 return true; -2263 } -2264 return false; -2265 } -2266 -2267 /** -2268 * Should the memstore be flushed now -2269 */ -2270 boolean shouldFlush(final StringBuffer whyFlush) { -2271 whyFlush.setLength(0); -2272 // This is a rough measure. -2273 if (this.maxFlushedSeqId > 0 -2274 && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) { -2275 whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush"); -2276 return true; -2277 } -2278 long modifiedFlushCheckInterval = flushCheckInterval; -2279 if (getRegionInfo().isSystemTable() && -2280 getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { -2281 modifiedFlushCheckInterval = SYSTEM_CACHE_FLUSH_INTERVAL; +1997 } +1998 +1999 /* +2000 * Called by compaction thread and after region is opened to compact the +2001 * HStores if necessary. +2002 * +2003 * <p>This operation could block for a long time, so don't call it from a +2004 * time-sensitive thread. +2005 * +2006 * Note that no locking is necessary at this level because compaction only +2007 * conflicts with a region split, and that cannot happen because the region +2008 * server does them sequentially and not in parallel. +2009 * +2010 * @param compaction Compaction details, obtained by requestCompaction() +2011 * @param throughputController +2012 * @return whether the compaction completed +2013 */ +2014 public boolean compact(CompactionContext compaction, Store store, +2015 ThroughputController throughputController) throws IOException { +2016 return compact(compaction, store, throughputController, null); +2017 } +2018 +2019 public boolean compact(CompactionContext compaction, Store store, +2020 ThroughputController throughputController, User user) throws IOException { +2021 assert compaction != null && compaction.hasSelection(); +2022 assert !compaction.getRequest().getFiles().isEmpty(); +2023 if (this.closing.get() || this.closed.get()) { +2024 LOG.debug("Skipping compaction on " + this + " because closing/closed"); +2025 store.cancelRequestedCompaction(compaction); +2026 return false; +2027 } +2028 MonitoredTask status = null; +2029 boolean requestNeedsCancellation = true; +2030 /* +2031 * We are trying to remove / relax the region read lock for compaction. +2032 * Let's see what are the potential race conditions among the operations (user scan, +2033 * region split, region close and region bulk load). +2034 * +2035 * user scan ---> region read lock +2036 * region split --> region close first --> region write lock +2037 * region close --> region write lock +2038 * region bulk load --> region write lock +2039 * +2040 * read lock is compatible with read lock. ---> no problem with user scan/read +2041 * region bulk load does not cause problem for compaction (no consistency problem, store lock +2042 * will help the store file accounting). +2043 * They can run almost concurrently at the region level. +2044 * +2045 * The only remaining race condition is between the region close and compaction. +2046 * So we will evaluate, below, how region close intervenes with compaction if compaction does +2047 * not acquire region read lock. +2048 * +2049 * Here are the steps for compaction: +2050 * 1. obtain list of StoreFile's +2051 * 2. create StoreFileScanner's based on list from #1 +2052 * 3. perform compaction and save resulting files under tmp dir +2053 * 4. swap in compacted files +2054 * +2055 * #1 is guarded by store lock. This patch does not change this --> no worse or better +2056 * For #2, we obtain smallest read point (for region) across all the Scanners (for both default +2057 * compactor and stripe compactor). +2058 * The read points are for user scans. Region keeps the read points for all currently open +2059 * user scanners. +2060 * Compaction needs to know the smallest read point so that during re-write of the hfiles, +2061 * it can remove the mvcc points for the cells if their mvccs are older than the smallest +2062 * since they are not needed anymore. +2063 * This will not conflict with compaction. +2064 * For #3, it can be performed in parallel to other operations. +2065 * For #4 bulk load and compaction don't conflict with each other on the region level +2066 * (for multi-family atomicy). +2067 * Region close and compaction are guarded pretty well by the 'writestate'. +2068 * In HRegion#doClose(), we have : +2069 * synchronized (writestate) { +2070 * // Disable compacting and flushing by background threads for this +2071 * // region. +2072 * canFlush = !writestate.readOnly; +2073 * writestate.writesEnabled = false; +2074 * LOG.debug("Closing " + this + ": disabling compactions & flushes"); +2075 * waitForFlushesAndCompactions(); +2076 * } +2077 * waitForFlushesAndCompactions() would wait for writestate.compacting to come down to 0. +2078 * and in HRegion.compact() +2079 * try { +2080 * synchronized (writestate) { +2081 * if (writestate.writesEnabled) { +2082 * wasStateSet = true; +2083 * ++writestate.compacting; +2084 * } else { +2085 * String msg = "NOT compacting region " + this + ". Writes disabled."; +2086 * LOG.info(msg); +2087 * status.abort(msg); +2088 * return false; +2089 * } +2090 * } +2091 * Also in compactor.performCompaction(): +2092 * check periodically to see if a system stop is requested +2093 * if (closeCheckInterval > 0) { +2094 * bytesWritten += len; +2095 * if (bytesWritten > closeCheckInterval) { +2096 * bytesWritten = 0; +2097 * if (!store.areWritesEnabled()) { +2098 * progress.cancel(); +2099 * return false; +2100 * } +2101 * } +2102 * } +2103 */ +2104 try { +2105 byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); +2106 if (stores.get(cf) != store) { +2107 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this +2108 + " has been re-instantiated, cancel this compaction request. " +2109 + " It may be caused by the roll back of split transaction"); +2110 return false; +2111 } +2112 +2113 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); +2114 if (this.closed.get()) { +2115 String msg = "Skipping compaction on " + this + " because closed"; +2116 LOG.debug(msg); +2117 status.abort(msg); +2118 return false; +2119 }