Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B1D3200D35 for ; Mon, 23 Oct 2017 17:16:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 39E92160C01; Mon, 23 Oct 2017 15:16:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5D072160BF8 for ; Mon, 23 Oct 2017 17:16:08 +0200 (CEST) Received: (qmail 97862 invoked by uid 500); 23 Oct 2017 15:16:06 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 95270 invoked by uid 99); 23 Oct 2017 15:16:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Oct 2017 15:16:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2CC71DFAE0; Mon, 23 Oct 2017 15:16:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Mon, 23 Oct 2017 15:16:17 -0000 Message-Id: <33c7963cd9604052ac0328df2ad9ba77@git.apache.org> In-Reply-To: <8fe6e835609548138689d9b3cbcab70b@git.apache.org> References: <8fe6e835609548138689d9b3cbcab70b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/51] [partial] hbase-site git commit: Published site at . archived-at: Mon, 23 Oct 2017 15:16:12 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/41a7fcc5/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.PrepareFlushResult.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.PrepareFlushResult.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.PrepareFlushResult.html index 12fe16f..b1e0997 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.PrepareFlushResult.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.PrepareFlushResult.html @@ -1960,6279 +1960,6285 @@ 1952 protected void doRegionCompactionPrep() throws IOException { 1953 } 1954 -1955 @Override -1956 public void triggerMajorCompaction() throws IOException { -1957 stores.values().forEach(HStore::triggerMajorCompaction); -1958 } -1959 -1960 /** -1961 * Synchronously compact all stores in the region. -1962 * <p>This operation could block for a long time, so don't call it from a -1963 * time-sensitive thread. -1964 * <p>Note that no locks are taken to prevent possible conflicts between -1965 * compaction and splitting activities. The regionserver does not normally compact -1966 * and split in parallel. However by calling this method you may introduce -1967 * unexpected and unhandled concurrency. Don't do this unless you know what -1968 * you are doing. -1969 * -1970 * @param majorCompaction True to force a major compaction regardless of thresholds -1971 * @throws IOException -1972 */ -1973 public void compact(boolean majorCompaction) throws IOException { -1974 if (majorCompaction) { -1975 triggerMajorCompaction(); -1976 } -1977 for (HStore s : stores.values()) { -1978 Optional<CompactionContext> compaction = s.requestCompaction(); -1979 if (compaction.isPresent()) { -1980 ThroughputController controller = null; -1981 if (rsServices != null) { -1982 controller = CompactionThroughputControllerFactory.create(rsServices, conf); -1983 } -1984 if (controller == null) { -1985 controller = NoLimitThroughputController.INSTANCE; -1986 } -1987 compact(compaction.get(), s, controller, null); -1988 } -1989 } -1990 } -1991 -1992 /** -1993 * This is a helper function that compact all the stores synchronously. -1994 * <p> -1995 * It is used by utilities and testing -1996 */ -1997 @VisibleForTesting -1998 public void compactStores() throws IOException { -1999 for (HStore s : stores.values()) { -2000 Optional<CompactionContext> compaction = s.requestCompaction(); -2001 if (compaction.isPresent()) { -2002 compact(compaction.get(), s, NoLimitThroughputController.INSTANCE, null); -2003 } -2004 } -2005 } -2006 -2007 /** -2008 * This is a helper function that compact the given store. -2009 * <p> -2010 * It is used by utilities and testing -2011 */ -2012 @VisibleForTesting -2013 void compactStore(byte[] family, ThroughputController throughputController) throws IOException { -2014 HStore s = getStore(family); -2015 Optional<CompactionContext> compaction = s.requestCompaction(); -2016 if (compaction.isPresent()) { -2017 compact(compaction.get(), s, throughputController, null); -2018 } -2019 } -2020 -2021 /** -2022 * Called by compaction thread and after region is opened to compact the -2023 * HStores if necessary. -2024 * -2025 * <p>This operation could block for a long time, so don't call it from a -2026 * time-sensitive thread. -2027 * -2028 * Note that no locking is necessary at this level because compaction only -2029 * conflicts with a region split, and that cannot happen because the region -2030 * server does them sequentially and not in parallel. -2031 * -2032 * @param compaction Compaction details, obtained by requestCompaction() -2033 * @param throughputController -2034 * @return whether the compaction completed -2035 */ +1955 /** +1956 * Synchronously compact all stores in the region. +1957 * <p>This operation could block for a long time, so don't call it from a +1958 * time-sensitive thread. +1959 * <p>Note that no locks are taken to prevent possible conflicts between +1960 * compaction and splitting activities. The regionserver does not normally compact +1961 * and split in parallel. However by calling this method you may introduce +1962 * unexpected and unhandled concurrency. Don't do this unless you know what +1963 * you are doing. +1964 * +1965 * @param majorCompaction True to force a major compaction regardless of thresholds +1966 * @throws IOException +1967 */ +1968 public void compact(boolean majorCompaction) throws IOException { +1969 if (majorCompaction) { +1970 stores.values().forEach(HStore::triggerMajorCompaction); +1971 } +1972 for (HStore s : stores.values()) { +1973 Optional<CompactionContext> compaction = s.requestCompaction(); +1974 if (compaction.isPresent()) { +1975 ThroughputController controller = null; +1976 if (rsServices != null) { +1977 controller = CompactionThroughputControllerFactory.create(rsServices, conf); +1978 } +1979 if (controller == null) { +1980 controller = NoLimitThroughputController.INSTANCE; +1981 } +1982 compact(compaction.get(), s, controller, null); +1983 } +1984 } +1985 } +1986 +1987 /** +1988 * This is a helper function that compact all the stores synchronously. +1989 * <p> +1990 * It is used by utilities and testing +1991 */ +1992 @VisibleForTesting +1993 public void compactStores() throws IOException { +1994 for (HStore s : stores.values()) { +1995 Optional<CompactionContext> compaction = s.requestCompaction(); +1996 if (compaction.isPresent()) { +1997 compact(compaction.get(), s, NoLimitThroughputController.INSTANCE, null); +1998 } +1999 } +2000 } +2001 +2002 /** +2003 * This is a helper function that compact the given store. +2004 * <p> +2005 * It is used by utilities and testing +2006 */ +2007 @VisibleForTesting +2008 void compactStore(byte[] family, ThroughputController throughputController) throws IOException { +2009 HStore s = getStore(family); +2010 Optional<CompactionContext> compaction = s.requestCompaction(); +2011 if (compaction.isPresent()) { +2012 compact(compaction.get(), s, throughputController, null); +2013 } +2014 } +2015 +2016 /** +2017 * Called by compaction thread and after region is opened to compact the +2018 * HStores if necessary. +2019 * +2020 * <p>This operation could block for a long time, so don't call it from a +2021 * time-sensitive thread. +2022 * +2023 * Note that no locking is necessary at this level because compaction only +2024 * conflicts with a region split, and that cannot happen because the region +2025 * server does them sequentially and not in parallel. +2026 * +2027 * @param compaction Compaction details, obtained by requestCompaction() +2028 * @param throughputController +2029 * @return whether the compaction completed +2030 */ +2031 public boolean compact(CompactionContext compaction, HStore store, +2032 ThroughputController throughputController) throws IOException { +2033 return compact(compaction, store, throughputController, null); +2034 } +2035 2036 public boolean compact(CompactionContext compaction, HStore store, -2037 ThroughputController throughputController) throws IOException { -2038 return compact(compaction, store, throughputController, null); -2039 } -2040 -2041 public boolean compact(CompactionContext compaction, HStore store, -2042 ThroughputController throughputController, User user) throws IOException { -2043 assert compaction != null && compaction.hasSelection(); -2044 assert !compaction.getRequest().getFiles().isEmpty(); -2045 if (this.closing.get() || this.closed.get()) { -2046 LOG.debug("Skipping compaction on " + this + " because closing/closed"); -2047 store.cancelRequestedCompaction(compaction); -2048 return false; -2049 } -2050 MonitoredTask status = null; -2051 boolean requestNeedsCancellation = true; -2052 /* -2053 * We are trying to remove / relax the region read lock for compaction. -2054 * Let's see what are the potential race conditions among the operations (user scan, -2055 * region split, region close and region bulk load). +2037 ThroughputController throughputController, User user) throws IOException { +2038 assert compaction != null && compaction.hasSelection(); +2039 assert !compaction.getRequest().getFiles().isEmpty(); +2040 if (this.closing.get() || this.closed.get()) { +2041 LOG.debug("Skipping compaction on " + this + " because closing/closed"); +2042 store.cancelRequestedCompaction(compaction); +2043 return false; +2044 } +2045 MonitoredTask status = null; +2046 boolean requestNeedsCancellation = true; +2047 /* +2048 * We are trying to remove / relax the region read lock for compaction. +2049 * Let's see what are the potential race conditions among the operations (user scan, +2050 * region split, region close and region bulk load). +2051 * +2052 * user scan ---> region read lock +2053 * region split --> region close first --> region write lock +2054 * region close --> region write lock +2055 * region bulk load --> region write lock 2056 * -2057 * user scan ---> region read lock -2058 * region split --> region close first --> region write lock -2059 * region close --> region write lock -2060 * region bulk load --> region write lock +2057 * read lock is compatible with read lock. ---> no problem with user scan/read +2058 * region bulk load does not cause problem for compaction (no consistency problem, store lock +2059 * will help the store file accounting). +2060 * They can run almost concurrently at the region level. 2061 * -2062 * read lock is compatible with read lock. ---> no problem with user scan/read -2063 * region bulk load does not cause problem for compaction (no consistency problem, store lock -2064 * will help the store file accounting). -2065 * They can run almost concurrently at the region level. -2066 * -2067 * The only remaining race condition is between the region close and compaction. -2068 * So we will evaluate, below, how region close intervenes with compaction if compaction does -2069 * not acquire region read lock. -2070 * -2071 * Here are the steps for compaction: -2072 * 1. obtain list of StoreFile's -2073 * 2. create StoreFileScanner's based on list from #1 -2074 * 3. perform compaction and save resulting files under tmp dir -2075 * 4. swap in compacted files -2076 * -2077 * #1 is guarded by store lock. This patch does not change this --> no worse or better -2078 * For #2, we obtain smallest read point (for region) across all the Scanners (for both default -2079 * compactor and stripe compactor). -2080 * The read points are for user scans. Region keeps the read points for all currently open -2081 * user scanners. -2082 * Compaction needs to know the smallest read point so that during re-write of the hfiles, -2083 * it can remove the mvcc points for the cells if their mvccs are older than the smallest -2084 * since they are not needed anymore. -2085 * This will not conflict with compaction. -2086 * For #3, it can be performed in parallel to other operations. -2087 * For #4 bulk load and compaction don't conflict with each other on the region level -2088 * (for multi-family atomicy). -2089 * Region close and compaction are guarded pretty well by the 'writestate'. -2090 * In HRegion#doClose(), we have : -2091 * synchronized (writestate) { -2092 * // Disable compacting and flushing by background threads for this -2093 * // region. -2094 * canFlush = !writestate.readOnly; -2095 * writestate.writesEnabled = false; -2096 * LOG.debug("Closing " + this + ": disabling compactions & flushes"); -2097 * waitForFlushesAndCompactions(); -2098 * } -2099 * waitForFlushesAndCompactions() would wait for writestate.compacting to come down to 0. -2100 * and in HRegion.compact() -2101 * try { -2102 * synchronized (writestate) { -2103 * if (writestate.writesEnabled) { -2104 * wasStateSet = true; -2105 * ++writestate.compacting; -2106 * } else { -2107 * String msg = "NOT compacting region " + this + ". Writes disabled."; -2108 * LOG.info(msg); -2109 * status.abort(msg); -2110 * return false; -2111 * } -2112 * } -2113 * Also in compactor.performCompaction(): -2114 * check periodically to see if a system stop is requested -2115 * if (closeCheckInterval > 0) { -2116 * bytesWritten += len; -2117 * if (bytesWritten > closeCheckInterval) { -2118 * bytesWritten = 0; -2119 * if (!store.areWritesEnabled()) { -2120 * progress.cancel(); -2121 * return false; -2122 * } -2123 * } -2124 * } -2125 */ -2126 try { -2127 byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); -2128 if (stores.get(cf) != store) { -2129 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this -2130 + " has been re-instantiated, cancel this compaction request. " -2131 + " It may be caused by the roll back of split transaction"); -2132 return false; -2133 } -2134 -2135 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); -2136 if (this.closed.get()) { -2137 String msg = "Skipping compaction on " + this + " because closed"; -2138 LOG.debug(msg); -2139 status.abort(msg); -2140 return false; -2141 } -2142 boolean wasStateSet = false; -2143 try { -2144 synchronized (writestate) { -2145 if (writestate.writesEnabled) { -2146 wasStateSet = true; -2147 writestate.compacting.incrementAndGet(); -2148 } else { -2149 String msg = "NOT compacting region " + this + ". Writes disabled."; -2150 LOG.info(msg); -2151 status.abort(msg); -2152 return false; -2153 } -2154 } -2155 LOG.info("Starting compaction on " + store + " in region " + this -2156 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":"")); -2157 doRegionCompactionPrep(); -2158 try { -2159 status.setStatus("Compacting store " + store); -2160 // We no longer need to cancel the request on the way out of this -2161 // method because Store#compact will clean up unconditionally -2162 requestNeedsCancellation = false; -2163 store.compact(compaction, throughputController, user); -2164 } catch (InterruptedIOException iioe) { -2165 String msg = "compaction interrupted"; -2166 LOG.info(msg, iioe); -2167 status.abort(msg); -2168 return false; -2169 } -2170 } finally { -2171 if (wasStateSet) { -2172 synchronized (writestate) { -2173 writestate.compacting.decrementAndGet(); -2174 if (writestate.compacting.get() <= 0) { -2175 writestate.notifyAll(); -2176 } -2177 } -2178 } -2179 } -2180 status.markComplete("Compaction complete"); -2181 return true; -2182 } finally { -2183 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); -2184 if (status != null) status.cleanup(); -2185 } -2186 } -2187 -2188 /** -2189 * Flush the cache. -2190 * -2191 * <p>When this method is called the cache will be flushed unless: -2192 * <ol> -2193 * <li>the cache is empty</li> -2194 * <li>the region is closed.</li> -2195 * <li>a flush is already in progress</li> -2196 * <li>writes are disabled</li> -2197 * </ol> -2198 * -2199 * <p>This method may block for some time, so it should not be called from a -2200 * time-sensitive thread. -2201 * @param force whether we want to force a flush of all stores -2202 * @return FlushResult indicating whether the flush was successful or not and if -2203 * the region needs compacting -2204 * -2205 * @throws IOException general io exceptions -2206 * because a snapshot was not properly persisted. -2207 */ -2208 // TODO HBASE-18905. We might have to expose a requestFlush API for CPs -2209 public FlushResult flush(boolean force) throws IOException { -2210 return flushcache(force, false); -2211 } -2212 -2213 public static interface FlushResult { -2214 enum Result { -2215 FLUSHED_NO_COMPACTION_NEEDED, -2216 FLUSHED_COMPACTION_NEEDED, -2217 // Special case where a flush didn't run because there's nothing in the memstores. Used when -2218 // bulk loading to know when we can still load even if a flush didn't happen. -2219 CANNOT_FLUSH_MEMSTORE_EMPTY, -2220 CANNOT_FLUSH -2221 } -2222 -2223 /** @return the detailed result code */ -2224 Result getResult(); -2225 -2226 /** @return true if the memstores were flushed, else false */ -2227 boolean isFlushSucceeded(); -2228 -2229 /** @return True if the flush requested a compaction, else false */ -2230 boolean isCompactionNeeded(); -2231 } -2232 -2233 /** -2234 * Flush the cache. -2235 * -2236 * When this method is called the cache will be flushed unless: -2237 * <ol> -2238 * <li>the cache is empty</li> -2239 * <li>the region is closed.</li> -2240 * <li>a flush is already in progress</li> -2241 * <li>writes are disabled</li> -2242 * </ol> -2243 * -2244 * <p>This method may block for some time, so it should not be called from a -2245 * time-sensitive thread. -2246 * @param forceFlushAllStores whether we want to flush all stores -2247 * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL -2248 * @return whether the flush is success and whether the region needs compacting -2249 * -2250 * @throws IOException general io exceptions -2251 * @throws DroppedSnapshotException Thrown when replay of wal is required -2252 * because a Snapshot was not properly persisted. The region is put in closing mode, and the -2253 * caller MUST abort after this. -2254 */ -2255 public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) -2256 throws IOException { -2257 // fail-fast instead of waiting on the lock -2258 if (this.closing.get()) { -2259 String msg = "Skipping flush on " + this + " because closing"; -2260 LOG.debug(msg); -2261 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); -2262 } -2263 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); -2264 status.setStatus("Acquiring readlock on region"); -2265 // block waiting for the lock for flushing cache -2266 lock.readLock().lock(); -2267 try { -2268 if (this.closed.get()) { -2269 String msg = "Skipping flush on " + this + " because closed"; -2270 LOG.debug(msg); -2271 status.abort(msg); -2272 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); -2273 } -2274 if (coprocessorHost != null) { -2275 status.setStatus("Running coprocessor pre-flush hooks"); -2276 coprocessorHost.preFlush(); -2277 } -2278 // TODO: this should be managed within memstore with the snapshot, updated only after flush -2279 // successful -2280 if (numMutationsWithoutWAL.sum() > 0) { -2281 numMutationsWithoutWAL.reset(); -2282 dataInMemoryWithoutWAL.reset(); -2283 } -2284 synchronized (writestate) { -2285 if (!writestate.flushing && writestate.writesEnabled) { -2286 this.writestate.flushing = true; -2287 } else { -2288 if (LOG.isDebugEnabled()) { -2289 LOG.debug("NOT flushing memstore for region " + this -2290 + ", flushing=" + writestate.flushing + ", writesEnabled=" -2291 + writestate.writesEnabled); -2292 } -2293 String msg = "Not flushing since " -2294 + (writestate.flushing ? "already flushing" -2295 : "writes not enabled"); -2296 status.abort(msg); -2297 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); -2298 } -2299 } -2300 -2301 try { -2302 Collection<HStore> specificStoresToFlush = -2303 forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); -2304 FlushResultImpl fs = -2305 internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker); +2062 * The only remaining race condition is between the region close and compaction. +2063 * So we will evaluate, below, how region close intervenes with compaction if compaction does +2064 * not acquire region read lock. +2065 * +2066 * Here are the steps for compaction: +2067 * 1. obtain list of StoreFile's +2068 * 2. create StoreFileScanner's based on list from #1 +2069 * 3. perform compaction and save resulting files under tmp dir +2070 * 4. swap in compacted files +2071 * +2072 * #1 is guarded by store lock. This patch does not change this --> no worse or better +2073 * For #2, we obtain smallest read point (for region) across all the Scanners (for both default +2074 * compactor and stripe compactor). +2075 * The read points are for user scans. Region keeps the read points for all currently open +2076 * user scanners. +2077 * Compaction needs to know the smallest read point so that during re-write of the hfiles, +2078 * it can remove the mvcc points for the cells if their mvccs are older than the smallest +2079 * since they are not needed anymore. +2080 * This will not conflict with compaction. +2081 * For #3, it can be performed in parallel to other operations. +2082 * For #4 bulk load and compaction don't conflict with each other on the region level +2083 * (for multi-family atomicy). +2084 * Region close and compaction are guarded pretty well by the 'writestate'. +2085 * In HRegion#doClose(), we have : +2086 * synchronized (writestate) { +2087 * // Disable compacting and flushing by background threads for this +2088 * // region. +2089 * canFlush = !writestate.readOnly; +2090 * writestate.writesEnabled = false; +2091 * LOG.debug("Closing " + this + ": disabling compactions & flushes"); +2092 * waitForFlushesAndCompactions(); +2093 * } +2094 * waitForFlushesAndCompactions() would wait for writestate.compacting to come down to 0. +2095 * and in HRegion.compact() +2096 * try { +2097 * synchronized (writestate) { +2098 * if (writestate.writesEnabled) { +2099 * wasStateSet = true; +2100 * ++writestate.compacting; +2101 * } else { +2102 * String msg = "NOT compacting region " + this + ". Writes disabled."; +2103 * LOG.info(msg); +2104 * status.abort(msg); +2105 * return false; +2106 * } +2107 * } +2108 * Also in compactor.performCompaction(): +2109 * check periodically to see if a system stop is requested +2110 * if (closeCheckInterval > 0) { +2111 * bytesWritten += len; +2112 * if (bytesWritten > closeCheckInterval) { +2113 * bytesWritten = 0; +2114 * if (!store.areWritesEnabled()) { +2115 * progress.cancel(); +2116 * return false; +2117 * } +2118 * } +2119 * } +2120 */ +2121 try { +2122 byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); +2123 if (stores.get(cf) != store) { +2124 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this +2125 + " has been re-instantiated, cancel this compaction request. " +2126 + " It may be caused by the roll back of split transaction"); +2127 return false; +2128 } +2129 +2130 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); +2131 if (this.closed.get()) { +2132 String msg = "Skipping compaction on " + this + " because closed"; +2133 LOG.debug(msg); +2134 status.abort(msg); +2135 return false; +2136 } +2137 boolean wasStateSet = false; +2138 try { +2139 synchronized (writestate) { +2140 if (writestate.writesEnabled) { +2141 wasStateSet = true; +2142 writestate.compacting.incrementAndGet(); +2143 } else { +2144 String msg = "NOT compacting region " + this + ". Writes disabled."; +2145 LOG.info(msg); +2146 status.abort(msg); +2147 return false; +2148 } +2149 } +2150 LOG.info("Starting compaction on " + store + " in region " + this +2151 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":"")); +2152 doRegionCompactionPrep(); +2153 try { +2154 status.setStatus("Compacting store " + store); +2155 // We no longer need to cancel the request on the way out of this +2156 // method because Store#compact will clean up unconditionally +2157 requestNeedsCancellation = false; +2158 store.compact(compaction, throughputController, user); +2159 } catch (InterruptedIOException iioe) { +2160 String msg = "compaction interrupted"; +2161 LOG.info(msg, iioe); +2162 status.abort(msg); +2163 return false; +2164 } +2165 } finally { +2166 if (wasStateSet) { +2167 synchronized (writestate) { +2168 writestate.compacting.decrementAndGet(); +2169 if (writestate.compacting.get() <= 0) { +2170 writestate.notifyAll(); +2171 } +2172 } +2173 } +2174 } +2175 status.markComplete("Compaction complete"); +2176 return true; +2177 } finally { +2178 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); +2179 if (status != null) status.cleanup(); +2180 } +2181 } +2182 +2183 /** +2184 * Flush the cache. +2185 * +2186 * <p>When this method is called the cache will be flushed unless: +2187 * <ol> +2188 * <li>the cache is empty</li> +2189 * <li>the region is closed.</li> +2190 * <li>a flush is already in progress</li> +2191 * <li>writes are disabled</li> +2192 * </ol> +2193 * +2194 * <p>This method may block for some time, so it should not be called from a +2195 * time-sensitive thread. +2196 * @param force whether we want to force a flush of all stores +2197 * @return FlushResult indicating whether the flush was successful or not and if +2198 * the region needs compacting +2199 * +2200 * @throws IOException general io exceptions +2201 * because a snapshot was not properly persisted. +2202 */ +2203 // TODO HBASE-18905. We might have to expose a requestFlush API for CPs +2204 public FlushResult flush(boolean force) throws IOException { +2205 return flushcache(force, false); +2206 } +2207 +2208 public static interface FlushResult { +2209 enum Result { +2210 FLUSHED_NO_COMPACTION_NEEDED, +2211 FLUSHED_COMPACTION_NEEDED, +2212 // Special case where a flush didn't run because there's nothing in the memstores. Used when +2213 // bulk loading to know when we can still load even if a flush didn't happen. +2214 CANNOT_FLUSH_MEMSTORE_EMPTY, +2215 CANNOT_FLUSH +2216 } +2217 +2218 /** @return the detailed result code */ +2219 Result getResult(); +2220 +2221 /** @return true if the memstores were flushed, else false */ +2222 boolean isFlushSucceeded(); +2223 +2224 /** @return True if the flush requested a compaction, else false */ +2225 boolean isCompactionNeeded(); +2226 } +2227 +2228 /** +2229 * Flush the cache. +2230 * +2231 * When this method is called the cache will be flushed unless: +2232 * <ol> +2233 * <li>the cache is empty</li> +2234 * <li>the region is closed.</li> +2235 * <li>a flush is already in progress</li> +2236 * <li>writes are disabled</li> +2237 * </ol> +2238 * +2239 * <p>This method may block for some time, so it should not be called from a +2240 * time-sensitive thread. +2241 * @param forceFlushAllStores whether we want to flush all stores +2242 * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL +2243 * @return whether the flush is success and whether the region needs compacting +2244 * +2245 * @throws IOException general io exceptions +2246 * @throws DroppedSnapshotException Thrown when replay of wal is required +2247 * because a Snapshot was not properly persisted. The region is put in closing mode, and the +2248 * caller MUST abort after this. +2249 */ +2250 public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) +2251 throws IOException { +2252 // fail-fast instead of waiting on the lock +2253 if (this.closing.get()) { +2254 String msg = "Skipping flush on " + this + " because closing"; +2255 LOG.debug(msg); +2256 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); +2257 } +2258 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); +2259 status.setStatus("Acquiring readlock on region"); +2260 // block waiting for the lock for flushing cache +2261 lock.readLock().lock(); +2262 try { +2263 if (this.closed.get()) { +2264 String msg = "Skipping flush on " + this + " because closed"; +2265 LOG.debug(msg); +2266 status.abort(msg); +2267 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); +2268 } +2269 if (coprocessorHost != null) { +2270 status.setStatus("Running coprocessor pre-flush hooks"); +2271 coprocessorHost.preFlush(); +2272 } +2273 // TODO: this should be managed within memstore with the snapshot, updated only after flush +2274 // successful +2275 if (numMutationsWithoutWAL.sum() > 0) { +2276 numMutationsWithoutWAL.reset(); +2277 dataInMemoryWithoutWAL.reset(); +2278 } +2279 synchronized (writestate) { +2280 if (!writestate.flushing && writestate.writesEnabled) { +2281 this.writestate.flushing = true; +2282 } else { +2283 if (LOG.isDebugEnabled()) { +2284 LOG.debug("NOT flushing memstore for region " + this +2285 + ", flushing=" + writestate.flushing + ", writesEnabled=" +2286 + writestate.writesEnabled); +2287 } +2288 String msg = "Not flushing since " +2289 + (writestate.flushing ? "already flushing" +2290 : "writes not enabled"); +2291 status.abort(msg); +2292 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); +2293 } +2294 } +2295 +2296 try { +2297 Collection<HStore> specificStoresToFlush = +2298 forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); +2299 FlushResultImpl fs = +2300 internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker); +2301 +2302 if (coprocessorHost != null) { +2303 status.setStatus("Running post-flush coprocessor hooks"); +2304 coprocessorHost.postFlush(); +2305 } 2306 -2307 if (coprocessorHost != null) { -2308 status.setStatus("Running post-flush coprocessor hooks"); -2309 coprocessorHost.postFlush(); -2310 } -2311 -2312 if(fs.isFlushSucceeded()) { -2313 flushesQueued.reset(); -2314 } -2315 -2316 status.markComplete("Flush successful"); -2317 return fs; -2318 } finally { -2319 synchronized (writestate) { -2320 writestate.flushing = false; -2321 this.writestate.flushRequested = false; -2322 writestate.notifyAll(); -2323 } -2324 } -2325 } finally { -2326 lock.readLock().unlock(); -2327 status.cleanup(); -2328 } -2329 } -2330 -2331 /** -2332 * Should the store be flushed because it is old enough. -2333 * <p> -2334 * Every FlushPolicy should call this to determine whether a store is old enough to flush (except -2335 * that you always flush all stores). Otherwise the method will always -2336 * returns true which will make a lot of flush requests. -2337 */ -2338 boolean shouldFlushStore(HStore store) { -2339 long earliest = this.wal.getEarliestMemStoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), -2340 store.getColumnFamilyDescriptor().getName()) - 1; -2341 if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) { -2342 if (LOG.isDebugEnabled()) { -2343 LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " + -2344 getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest + -2345 " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint()); -2346 } -2347 return true; -2348 } -2349 if (this.flushCheckInterval <= 0) { -2350 return false; -2351 } -2352 long now = EnvironmentEdgeManager.currentTime(); -2353 if (store.timeOfOldestEdit() < now - this.flushCheckInterval) { -2354 if (LOG.isDebugEnabled()) { -2355 LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " + -2356 getRegionInfo().getEncodedName() + " because time of oldest edit=" + -2357 store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now); -2358 } -2359 return true; -2360 } -2361 return false; -2362 } -2363 -2364 /** -2365 * Should the memstore be flushed now -2366 */ -2367 boolean shouldFlush(final StringBuffer whyFlush) { -2368 whyFlush.setLength(0); -2369 // This is a rough measure. -2370 if (this.maxFlushedSeqId > 0 -2371 && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) { -2372 whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush"); -2373 return true; +2307 if(fs.isFlushSucceeded()) { +2308 flushesQueued.reset(); +2309 } +2310 +2311 status.markComplete("Flush successful"); +2312 return fs; +2313 } finally { +2314 synchronized (writestate) { +2315 writestate.flushing = false; +2316 this.writestate.flushRequested = false; +2317 writestate.notifyAll(); +2318 } +2319 } +2320 } finally { +2321 lock.readLock().unlock(); +2322 status.cleanup(); +2323 } +2324 } +2325 +2326 /** +2327 * Should the store be flushed because it is old enough. +2328 * <p> +2329 * Every FlushPolicy should call this to determine whether a store is old enough to flush (except +2330 * that you always flush all stores). Otherwise the method will always +2331 * returns true which will make a lot of flush requests. +2332 */ +2333 boolean shouldFlushStore(HStore store) { +2334 long earliest = this.wal.getEarliestMemStoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), +2335 store.getColumnFamilyDescriptor().getName()) - 1; +2336 if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) { +2337 if (LOG.isDebugEnabled()) { +2338 LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " + +2339 getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest + +2340 " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint()); +2341 } +2342 return true; +2343 } +2344 if (this.flushCheckInterval <= 0) { +2345 return false; +2346 } +2347 long now = EnvironmentEdgeManager.currentTime(); +2348 if (store.timeOfOldestEdit() < now - this.flushCheckInterval) { +2349 if (LOG.isDebugEnabled()) { +2350 LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " + +2351 getRegionInfo().getEncodedName() + " because time of oldest edit=" + +2352 store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now); +2353 } +2354 return true; +2355 } +2356 return false; +2357 } +2358 +2359 /** +2360 * Should the memstore be flushed now +2361 */ +2362 boolean shouldFlush(final StringBuffer whyFlush) { +2363 whyFlush.setLength(0); +2364 // This is a rough measure. +2365 if (this.maxFlushedSeqId > 0 +2366 && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) { +2367 whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush"); +2368 return true; +2369 } +2370 long modifiedFlushCheckInterval = flushCheckInterval; +2371 if (getRegionInfo().getTable().isSystemTable() && +2372 getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) { +2373 modifiedFlushCheckInterval = SYSTEM_CACHE_FLUSH_INTERVAL; 2374 } -2375 long modifiedFlushCheckInterval = flushCheckInterval; -2376 if (getRegionInfo().getTable().isSystemTable() && -2377 getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) { -2378 modifiedFlushCheckInterval = SYSTEM_CACHE_FLUSH_INTERVAL; -2379 } -2380 if (modifiedFlushCheckInterval <= 0) { //disabled +2375 if (modifiedFlushCheckInterval <= 0) { //disabled +2376 return false; +2377 } +2378 long now = EnvironmentEdgeManager.currentTime(); +2379 //if we flushed in the recent past, we don't need to do again now +2380 if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) { 2381 return false; 2382 } -2383 long now = EnvironmentEdgeManager.currentTime(); -2384 //if we flushed in the recent past, we don't need to do again now -2385 if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) { -2386 return false; -2387 } -2388 //since we didn't flush in the recent past, flush now if certain conditions -2389 //are met. Return true on first such memstore hit. -2390 for (HStore s : stores.values()) { -2391 if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) { -2392 // we have an old enough edit in the memstor