Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AED37200D28 for ; Mon, 23 Oct 2017 17:16:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AD785160BF4; Mon, 23 Oct 2017 15:16:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DC2C0160BF7 for ; Mon, 23 Oct 2017 17:16:10 +0200 (CEST) Received: (qmail 98953 invoked by uid 500); 23 Oct 2017 15:16:07 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 96333 invoked by uid 99); 23 Oct 2017 15:16:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Oct 2017 15:16:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82478DFE5C; Mon, 23 Oct 2017 15:16:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Mon, 23 Oct 2017 15:16:27 -0000 Message-Id: In-Reply-To: <8fe6e835609548138689d9b3cbcab70b@git.apache.org> References: <8fe6e835609548138689d9b3cbcab70b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [25/51] [partial] hbase-site git commit: Published site at . archived-at: Mon, 23 Oct 2017 15:16:15 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/41a7fcc5/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplit.CompactionRunner.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplit.CompactionRunner.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplit.CompactionRunner.html index 039a505..7df078f 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplit.CompactionRunner.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplit.CompactionRunner.html @@ -42,684 +42,741 @@ 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; -037 -038import org.apache.commons.logging.Log; -039import org.apache.commons.logging.LogFactory; -040import org.apache.hadoop.conf.Configuration; -041import org.apache.hadoop.hbase.conf.ConfigurationManager; -042import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; -043import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; -044import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -045import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; -047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; -048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -049import org.apache.hadoop.hbase.security.User; -050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -051import org.apache.hadoop.hbase.util.StealJobQueue; -052import org.apache.hadoop.ipc.RemoteException; -053import org.apache.hadoop.util.StringUtils; -054import org.apache.yetus.audience.InterfaceAudience; -055 -056import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -057import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +037import java.util.concurrent.atomic.AtomicInteger; +038import java.util.function.IntSupplier; +039 +040import org.apache.commons.logging.Log; +041import org.apache.commons.logging.LogFactory; +042import org.apache.hadoop.conf.Configuration; +043import org.apache.hadoop.hbase.conf.ConfigurationManager; +044import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; +045import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; +046import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +047import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +048import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; +049import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +050import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +051import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +052import org.apache.hadoop.hbase.security.User; +053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +054import org.apache.hadoop.hbase.util.StealJobQueue; +055import org.apache.hadoop.ipc.RemoteException; +056import org.apache.hadoop.util.StringUtils; +057import org.apache.yetus.audience.InterfaceAudience; 058 -059/** -060 * Compact region on request and then run split if appropriate -061 */ -062@InterfaceAudience.Private -063public class CompactSplit implements PropagatingConfigurationObserver { -064 private static final Log LOG = LogFactory.getLog(CompactSplit.class); -065 -066 // Configuration key for the large compaction threads. -067 public final static String LARGE_COMPACTION_THREADS = -068 "hbase.regionserver.thread.compaction.large"; -069 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; -070 -071 // Configuration key for the small compaction threads. -072 public final static String SMALL_COMPACTION_THREADS = -073 "hbase.regionserver.thread.compaction.small"; -074 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; -075 -076 // Configuration key for split threads -077 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; -078 public final static int SPLIT_THREADS_DEFAULT = 1; -079 -080 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = -081 "hbase.regionserver.regionSplitLimit"; -082 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; -083 -084 private final HRegionServer server; -085 private final Configuration conf; +059import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +060import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +061 +062/** +063 * Compact region on request and then run split if appropriate +064 */ +065@InterfaceAudience.Private +066public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { +067 private static final Log LOG = LogFactory.getLog(CompactSplit.class); +068 +069 // Configuration key for the large compaction threads. +070 public final static String LARGE_COMPACTION_THREADS = +071 "hbase.regionserver.thread.compaction.large"; +072 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; +073 +074 // Configuration key for the small compaction threads. +075 public final static String SMALL_COMPACTION_THREADS = +076 "hbase.regionserver.thread.compaction.small"; +077 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; +078 +079 // Configuration key for split threads +080 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; +081 public final static int SPLIT_THREADS_DEFAULT = 1; +082 +083 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = +084 "hbase.regionserver.regionSplitLimit"; +085 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; 086 -087 private final ThreadPoolExecutor longCompactions; -088 private final ThreadPoolExecutor shortCompactions; -089 private final ThreadPoolExecutor splits; -090 -091 private volatile ThroughputController compactionThroughputController; -092 -093 /** -094 * Splitting should not take place if the total number of regions exceed this. -095 * This is not a hard limit to the number of regions but it is a guideline to -096 * stop splitting after number of online regions is greater than this. -097 */ -098 private int regionSplitLimit; -099 -100 /** @param server */ -101 CompactSplit(HRegionServer server) { -102 super(); -103 this.server = server; -104 this.conf = server.getConfiguration(); -105 this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, -106 DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); -107 -108 int largeThreads = Math.max(1, conf.getInt( -109 LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); -110 int smallThreads = conf.getInt( -111 SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); -112 -113 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); +087 private final HRegionServer server; +088 private final Configuration conf; +089 +090 private final ThreadPoolExecutor longCompactions; +091 private final ThreadPoolExecutor shortCompactions; +092 private final ThreadPoolExecutor splits; +093 +094 private volatile ThroughputController compactionThroughputController; +095 +096 /** +097 * Splitting should not take place if the total number of regions exceed this. +098 * This is not a hard limit to the number of regions but it is a guideline to +099 * stop splitting after number of online regions is greater than this. +100 */ +101 private int regionSplitLimit; +102 +103 /** @param server */ +104 CompactSplit(HRegionServer server) { +105 this.server = server; +106 this.conf = server.getConfiguration(); +107 this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, +108 DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); +109 +110 int largeThreads = Math.max(1, conf.getInt( +111 LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); +112 int smallThreads = conf.getInt( +113 SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 114 -115 // if we have throttle threads, make sure the user also specified size -116 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); -117 -118 final String n = Thread.currentThread().getName(); +115 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); +116 +117 // if we have throttle threads, make sure the user also specified size +118 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 119 -120 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); -121 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, -122 60, TimeUnit.SECONDS, stealJobQueue, -123 new ThreadFactory() { -124 @Override -125 public Thread newThread(Runnable r) { -126 String name = n + "-longCompactions-" + System.currentTimeMillis(); -127 return new Thread(r, name); -128 } -129 }); -130 this.longCompactions.setRejectedExecutionHandler(new Rejection()); -131 this.longCompactions.prestartAllCoreThreads(); -132 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, -133 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), -134 new ThreadFactory() { -135 @Override -136 public Thread newThread(Runnable r) { -137 String name = n + "-shortCompactions-" + System.currentTimeMillis(); -138 return new Thread(r, name); -139 } -140 }); -141 this.shortCompactions -142 .setRejectedExecutionHandler(new Rejection()); -143 this.splits = (ThreadPoolExecutor) -144 Executors.newFixedThreadPool(splitThreads, -145 new ThreadFactory() { -146 @Override -147 public Thread newThread(Runnable r) { -148 String name = n + "-splits-" + System.currentTimeMillis(); -149 return new Thread(r, name); -150 } -151 }); -152 -153 // compaction throughput controller -154 this.compactionThroughputController = -155 CompactionThroughputControllerFactory.create(server, conf); -156 } -157 -158 @Override -159 public String toString() { -160 return "compaction_queue=(" -161 + longCompactions.getQueue().size() + ":" -162 + shortCompactions.getQueue().size() + ")" -163 + ", split_queue=" + splits.getQueue().size(); -164 } -165 -166 public String dumpQueue() { -167 StringBuffer queueLists = new StringBuffer(); -168 queueLists.append("Compaction/Split Queue dump:\n"); -169 queueLists.append(" LargeCompation Queue:\n"); -170 BlockingQueue<Runnable> lq = longCompactions.getQueue(); -171 Iterator<Runnable> it = lq.iterator(); -172 while (it.hasNext()) { -173 queueLists.append(" " + it.next().toString()); -174 queueLists.append("\n"); -175 } -176 -177 if (shortCompactions != null) { -178 queueLists.append("\n"); -179 queueLists.append(" SmallCompation Queue:\n"); -180 lq = shortCompactions.getQueue(); -181 it = lq.iterator(); -182 while (it.hasNext()) { -183 queueLists.append(" " + it.next().toString()); -184 queueLists.append("\n"); -185 } -186 } -187 -188 queueLists.append("\n"); -189 queueLists.append(" Split Queue:\n"); -190 lq = splits.getQueue(); -191 it = lq.iterator(); -192 while (it.hasNext()) { -193 queueLists.append(" " + it.next().toString()); -194 queueLists.append("\n"); -195 } -196 -197 return queueLists.toString(); -198 } -199 -200 public synchronized boolean requestSplit(final Region r) { -201 // don't split regions that are blocking -202 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) { -203 byte[] midKey = ((HRegion)r).checkSplit(); -204 if (midKey != null) { -205 requestSplit(r, midKey); -206 return true; -207 } -208 } -209 return false; -210 } -211 -212 public synchronized void requestSplit(final Region r, byte[] midKey) { -213 requestSplit(r, midKey, null); -214 } -215 -216 /* -217 * The User parameter allows the split thread to assume the correct user identity -218 */ -219 public synchronized void requestSplit(final Region r, byte[] midKey, User user) { -220 if (midKey == null) { -221 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + -222 " not splittable because midkey=null"); -223 if (((HRegion)r).shouldForceSplit()) { -224 ((HRegion)r).clearSplit(); -225 } -226 return; -227 } -228 try { -229 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); -230 if (LOG.isDebugEnabled()) { -231 LOG.debug("Splitting " + r + ", " + this); -232 } -233 } catch (RejectedExecutionException ree) { -234 LOG.info("Could not execute split for " + r, ree); -235 } -236 } -237 -238 public synchronized void requestCompaction(HRegion region, String why, int priority, -239 CompactionLifeCycleTracker tracker, User user) throws IOException { -240 requestCompactionInternal(region, why, priority, true, tracker, user); -241 } -242 -243 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, -244 CompactionLifeCycleTracker tracker, User user) throws IOException { -245 requestCompactionInternal(region, store, why, priority, true, tracker, user); -246 } +120 final String n = Thread.currentThread().getName(); +121 +122 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); +123 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, +124 60, TimeUnit.SECONDS, stealJobQueue, +125 new ThreadFactory() { +126 @Override +127 public Thread newThread(Runnable r) { +128 String name = n + "-longCompactions-" + System.currentTimeMillis(); +129 return new Thread(r, name); +130 } +131 }); +132 this.longCompactions.setRejectedExecutionHandler(new Rejection()); +133 this.longCompactions.prestartAllCoreThreads(); +134 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, +135 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), +136 new ThreadFactory() { +137 @Override +138 public Thread newThread(Runnable r) { +139 String name = n + "-shortCompactions-" + System.currentTimeMillis(); +140 return new Thread(r, name); +141 } +142 }); +143 this.shortCompactions +144 .setRejectedExecutionHandler(new Rejection()); +145 this.splits = (ThreadPoolExecutor) +146 Executors.newFixedThreadPool(splitThreads, +147 new ThreadFactory() { +148 @Override +149 public Thread newThread(Runnable r) { +150 String name = n + "-splits-" + System.currentTimeMillis(); +151 return new Thread(r, name); +152 } +153 }); +154 +155 // compaction throughput controller +156 this.compactionThroughputController = +157 CompactionThroughputControllerFactory.create(server, conf); +158 } +159 +160 @Override +161 public String toString() { +162 return "compaction_queue=(" +163 + longCompactions.getQueue().size() + ":" +164 + shortCompactions.getQueue().size() + ")" +165 + ", split_queue=" + splits.getQueue().size(); +166 } +167 +168 public String dumpQueue() { +169 StringBuffer queueLists = new StringBuffer(); +170 queueLists.append("Compaction/Split Queue dump:\n"); +171 queueLists.append(" LargeCompation Queue:\n"); +172 BlockingQueue<Runnable> lq = longCompactions.getQueue(); +173 Iterator<Runnable> it = lq.iterator(); +174 while (it.hasNext()) { +175 queueLists.append(" " + it.next().toString()); +176 queueLists.append("\n"); +177 } +178 +179 if (shortCompactions != null) { +180 queueLists.append("\n"); +181 queueLists.append(" SmallCompation Queue:\n"); +182 lq = shortCompactions.getQueue(); +183 it = lq.iterator(); +184 while (it.hasNext()) { +185 queueLists.append(" " + it.next().toString()); +186 queueLists.append("\n"); +187 } +188 } +189 +190 queueLists.append("\n"); +191 queueLists.append(" Split Queue:\n"); +192 lq = splits.getQueue(); +193 it = lq.iterator(); +194 while (it.hasNext()) { +195 queueLists.append(" " + it.next().toString()); +196 queueLists.append("\n"); +197 } +198 +199 return queueLists.toString(); +200 } +201 +202 public synchronized boolean requestSplit(final Region r) { +203 // don't split regions that are blocking +204 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) { +205 byte[] midKey = ((HRegion)r).checkSplit(); +206 if (midKey != null) { +207 requestSplit(r, midKey); +208 return true; +209 } +210 } +211 return false; +212 } +213 +214 public synchronized void requestSplit(final Region r, byte[] midKey) { +215 requestSplit(r, midKey, null); +216 } +217 +218 /* +219 * The User parameter allows the split thread to assume the correct user identity +220 */ +221 public synchronized void requestSplit(final Region r, byte[] midKey, User user) { +222 if (midKey == null) { +223 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + +224 " not splittable because midkey=null"); +225 if (((HRegion)r).shouldForceSplit()) { +226 ((HRegion)r).clearSplit(); +227 } +228 return; +229 } +230 try { +231 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); +232 if (LOG.isDebugEnabled()) { +233 LOG.debug("Splitting " + r + ", " + this); +234 } +235 } catch (RejectedExecutionException ree) { +236 LOG.info("Could not execute split for " + r, ree); +237 } +238 } +239 +240 // A compaction life cycle tracker to trace the execution of all the compactions triggered by one +241 // request and delegate to the source CompactionLifeCycleTracker. It will call completed method if +242 // all the compactions are finished. +243 private static final class AggregatingCompactionLifeCycleTracker +244 implements CompactionLifeCycleTracker { +245 +246 private final CompactionLifeCycleTracker tracker; 247 -248 private void requestCompactionInternal(HRegion region, String why, int priority, -249 boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException { -250 // request compaction on all stores -251 for (HStore store : region.stores.values()) { -252 requestCompactionInternal(region, store, why, priority, selectNow, tracker, user); -253 } -254 } +248 private final AtomicInteger remaining; +249 +250 public AggregatingCompactionLifeCycleTracker(CompactionLifeCycleTracker tracker, +251 int numberOfStores) { +252 this.tracker = tracker; +253 this.remaining = new AtomicInteger(numberOfStores); +254 } 255 -256 private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, -257 boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException { -258 if (this.server.isStopped() || (region.getTableDescriptor() != null && -259 !region.getTableDescriptor().isCompactionEnabled())) { -260 return; -261 } -262 Optional<CompactionContext> compaction; -263 if (selectNow) { -264 compaction = selectCompaction(region, store, priority, tracker, user); -265 if (!compaction.isPresent()) { -266 // message logged inside -267 return; -268 } -269 } else { -270 compaction = Optional.empty(); +256 private void tryCompleted() { +257 if (remaining.decrementAndGet() == 0) { +258 tracker.completed(); +259 } +260 } +261 +262 @Override +263 public void notExecuted(Store store, String reason) { +264 tracker.notExecuted(store, reason); +265 tryCompleted(); +266 } +267 +268 @Override +269 public void beforeExecution(Store store) { +270 tracker.beforeExecution(store); 271 } 272 -273 RegionServerSpaceQuotaManager spaceQuotaManager = -274 this.server.getRegionServerSpaceQuotaManager(); -275 if (spaceQuotaManager != null && -276 spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { -277 if (LOG.isDebugEnabled()) { -278 LOG.debug("Ignoring compaction request for " + region + -279 " as an active space quota violation " + " policy disallows compactions."); -280 } -281 return; -282 } -283 -284 ThreadPoolExecutor pool; -285 if (selectNow) { -286 // compaction.get is safe as we will just return if selectNow is true but no compaction is -287 // selected -288 pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions -289 : shortCompactions; -290 } else { -291 // We assume that most compactions are small. So, put system compactions into small -292 // pool; we will do selection there, and move to large pool if necessary. -293 pool = shortCompactions; -294 } -295 pool.execute(new CompactionRunner(store, region, compaction, pool, user)); -296 region.incrementCompactionsQueuedCount(); -297 if (LOG.isDebugEnabled()) { -298 String type = (pool == shortCompactions) ? "Small " : "Large "; -299 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") -300 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); -301 } +273 @Override +274 public void afterExecution(Store store) { +275 tracker.afterExecution(store); +276 tryCompleted(); +277 } +278 } +279 +280 private CompactionLifeCycleTracker wrap(CompactionLifeCycleTracker tracker, +281 IntSupplier numberOfStores) { +282 if (tracker == CompactionLifeCycleTracker.DUMMY) { +283 // a simple optimization to avoid creating unnecessary objects as usually we do not care about +284 // the life cycle of a compaction. +285 return tracker; +286 } else { +287 return new AggregatingCompactionLifeCycleTracker(tracker, numberOfStores.getAsInt()); +288 } +289 } +290 +291 @Override +292 public synchronized void requestCompaction(HRegion region, String why, int priority, +293 CompactionLifeCycleTracker tracker, User user) throws IOException { +294 requestCompactionInternal(region, why, priority, true, +295 wrap(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); +296 } +297 +298 @Override +299 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, +300 CompactionLifeCycleTracker tracker, User user) throws IOException { +301 requestCompactionInternal(region, store, why, priority, true, wrap(tracker, () -> 1), user); 302 } 303 -304 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { -305 requestCompactionInternal(region, why, NO_PRIORITY, false, -306 CompactionLifeCycleTracker.DUMMY, null); -307 } -308 -309 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) -310 throws IOException { -311 requestCompactionInternal(region, store, why, NO_PRIORITY, false, -312 CompactionLifeCycleTracker.DUMMY, null); -313 } -314 -315 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, -316 CompactionLifeCycleTracker tracker, User user) throws IOException { -317 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); -318 if (!compaction.isPresent() && LOG.isDebugEnabled() && region.getRegionInfo() != null) { -319 LOG.debug("Not compacting " + region.getRegionInfo().getRegionNameAsString() + -320 " because compaction request was cancelled"); -321 } -322 return compaction; -323 } -324 -325 /** -326 * Only interrupt once it's done with a run through the work loop. -327 */ -328 void interruptIfNecessary() { -329 splits.shutdown(); -330 longCompactions.shutdown(); -331 shortCompactions.shutdown(); -332 } -333 -334 private void waitFor(ThreadPoolExecutor t, String name) { -335 boolean done = false; -336 while (!done) { -337 try { -338 done = t.awaitTermination(60, TimeUnit.SECONDS); -339 LOG.info("Waiting for " + name + " to finish..."); -340 if (!done) { -341 t.shutdownNow(); -342 } -343 } catch (InterruptedException ie) { -344 LOG.warn("Interrupted waiting for " + name + " to finish..."); -345 } -346 } -347 } -348 -349 void join() { -350 waitFor(splits, "Split Thread"); -351 waitFor(longCompactions, "Large Compaction Thread"); -352 waitFor(shortCompactions, "Small Compaction Thread"); -353 } -354 -355 /** -356 * Returns the current size of the queue containing regions that are -357 * processed. -358 * -359 * @return The current size of the regions queue. -360 */ -361 public int getCompactionQueueSize() { -362 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); +304 private void requestCompactionInternal(HRegion region, String why, int priority, +305 boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException { +306 // request compaction on all stores +307 for (HStore store : region.stores.values()) { +308 requestCompactionInternal(region, store, why, priority, selectNow, tracker, user); +309 } +310 } +311 +312 private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, +313 boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException { +314 if (this.server.isStopped() || (region.getTableDescriptor() != null && +315 !region.getTableDescriptor().isCompactionEnabled())) { +316 return; +317 } +318 RegionServerSpaceQuotaManager spaceQuotaManager = +319 this.server.getRegionServerSpaceQuotaManager(); +320 if (spaceQuotaManager != null && +321 spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { +322 String reason = "Ignoring compaction request for " + region + +323 " as an active space quota violation " + " policy disallows compactions."; +324 tracker.notExecuted(store, reason); +325 LOG.debug(reason); +326 return; +327 } +328 +329 Optional<CompactionContext> compaction; +330 if (selectNow) { +331 compaction = selectCompaction(region, store, priority, tracker, user); +332 if (!compaction.isPresent()) { +333 // message logged inside +334 return; +335 } +336 } else { +337 compaction = Optional.empty(); +338 } +339 +340 ThreadPoolExecutor pool; +341 if (selectNow) { +342 // compaction.get is safe as we will just return if selectNow is true but no compaction is +343 // selected +344 pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions +345 : shortCompactions; +346 } else { +347 // We assume that most compactions are small. So, put system compactions into small +348 // pool; we will do selection there, and move to large pool if necessary. +349 pool = shortCompactions; +350 } +351 pool.execute(new CompactionRunner(store, region, compaction, pool, user)); +352 region.incrementCompactionsQueuedCount(); +353 if (LOG.isDebugEnabled()) { +354 String type = (pool == shortCompactions) ? "Small " : "Large "; +355 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") +356 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); +357 } +358 } +359 +360 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { +361 requestCompactionInternal(region, why, NO_PRIORITY, false, +362 CompactionLifeCycleTracker.DUMMY, null); 363 } 364 -365 public int getLargeCompactionQueueSize() { -366 return longCompactions.getQueue().size(); -367 } -368 -369 -370 public int getSmallCompactionQueueSize() { -371 return shortCompactions.getQueue().size(); -372 } -373 -374 public int getSplitQueueSize() { -375 return splits.getQueue().size(); -376 } -377 -378 private boolean shouldSplitRegion() { -379 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { -380 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " -381 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); -382 } -383 return (regionSplitLimit > server.getNumberOfOnlineRegions()); -384 } -385 -386 /** -387 * @return the regionSplitLimit -388 */ -389 public int getRegionSplitLimit() { -390 return this.regionSplitLimit; -391 } -392 -393 private static final Comparator<Runnable> COMPARATOR = -394 new Comparator<Runnable>() { -395 -396 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { -397 if (r1 == r2) { -398 return 0; //they are the same request -399 } -400 // less first -401 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); -402 if (cmp != 0) { -403 return cmp; -404 } -405 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); -406 if (cmp != 0) { -407 return cmp; -408 } -409 -410 // break the tie based on hash code -411 return System.identityHashCode(r1) - System.identityHashCode(r2); -412 } -413 -414 @Override -415 public int compare(Runnable r1, Runnable r2) { -416 // CompactionRunner first -417 if (r1 instanceof CompactionRunner) { -418 if (!(r2 instanceof CompactionRunner)) { -419 return -1; -420 } -421 } else { -422 if (r2 instanceof CompactionRunner) { -423 return 1; -424 } else { -425 // break the tie based on hash code -426 return System.identityHashCode(r1) - System.identityHashCode(r2); -427 } -428 } -429 CompactionRunner o1 = (CompactionRunner) r1; -430 CompactionRunner o2 = (CompactionRunner) r2; -431 // less first -432 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); -433 if (cmp != 0) { -434 return cmp; -435 } -436 Optional<CompactionContext> c1 = o1.compaction; -437 Optional<CompactionContext> c2 = o2.compaction; -438 if (c1.isPresent()) { -439 return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1; -440 } else { -441 return c2.isPresent() ? 1 : 0; -442 } -443 } -444 }; -445 -446 private final class CompactionRunner implements Runnable { -447 private final HStore store; -448 private final HRegion region; -449 private final Optional<CompactionContext> compaction; -450 private int queuedPriority; -451 private ThreadPoolExecutor parent; -452 private User user; -453 private long time; -454 -455 public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction, -456 ThreadPoolExecutor parent, User user) { -457 super(); -458 this.store = store; -459 this.region = region; -460 this.compaction = compaction; -461 this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority() -462 : store.getCompactPriority(); -463 this.parent = parent; -464 this.user = user; -465 this.time = System.currentTimeMillis(); -466 } +365 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) +366 throws IOException { +367 requestCompactionInternal(region, store, why, NO_PRIORITY, false, +368 CompactionLifeCycleTracker.DUMMY, null); +369 } +370 +371 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, +372 CompactionLifeCycleTracker tracker, User user) throws IOException { +373 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); +374 if (!compaction.isPresent() && region.getRegionInfo() != null) { +375 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + +376 " because compaction request was cancelled"; +377 tracker.notExecuted(store, reason); +378 LOG.debug(reason); +379 } +380 return compaction; +381 } +382 +383 /** +384 * Only interrupt once it's done with a run through the work loop. +385 */ +386 void interruptIfNecessary() { +387 splits.shutdown(); +388 longCompactions.shutdown(); +389 shortCompactions.shutdown(); +390 } +391 +392 private void waitFor(ThreadPoolExecutor t, String name) { +393 boolean done = false; +394 while (!done) { +395 try { +396 done = t.awaitTermination(60, TimeUnit.SECONDS); +397 LOG.info("Waiting for " + name + " to finish..."); +398 if (!done) { +399 t.shutdownNow(); +400 } +401 } catch (InterruptedException ie) { +402 LOG.warn("Interrupted waiting for " + name + " to finish..."); +403 } +404 } +405 } +406 +407 void join() { +408 waitFor(splits, "Split Thread"); +409 waitFor(longCompactions, "Large Compaction Thread"); +410 waitFor(shortCompactions, "Small Compaction Thread"); +411 } +412 +413 /** +414 * Returns the current size of the queue containing regions that are +415 * processed. +416 * +417 * @return The current size of the regions queue. +418 */ +419 public int getCompactionQueueSize() { +420 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); +421 } +422 +423 public int getLargeCompactionQueueSize() { +424 return longCompactions.getQueue().size(); +425 } +426 +427 +428 public int getSmallCompactionQueueSize() { +429 return shortCompactions.getQueue().size(); +430 } +431 +432 public int getSplitQueueSize() { +433 return splits.getQueue().size(); +434 } +435 +436 private boolean shouldSplitRegion() { +437 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { +438 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " +439 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); +440 } +441 return (regionSplitLimit > server.getNumberOfOnlineRegions()); +442 } +443 +444 /** +445 * @return the regionSplitLimit +446 */ +447 public int getRegionSplitLimit() { +448 return this.regionSplitLimit; +449 } +450 +451 private static final Comparator<Runnable> COMPARATOR = +452 new Comparator<Runnable>() { +453 +454 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { +455 if (r1 == r2) { +456 return 0; //they are the same request +457 } +458 // less first +459 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); +460 if (cmp != 0) { +461 return cmp; +462 } +463 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); +464 if (cmp != 0) { +465 return cmp; +466 } 467 -468 @Override -469 public String toString() { -470 return compaction.map(c -> "Request = " + c.getRequest()) -471 .orElse("regionName = " + region.toString() + ", storeName = " + store.toString() + -472 ", priority = " + queuedPriority + ", time = " + time); -473 } -474 -475 private void doCompaction(User user) { -476 CompactionContext c; -477 // Common case - system compaction without a file selection. Select now. -478 if (!compaction.isPresent()) { -479 int oldPriority = this.queuedPriority; -480 this.queuedPriority = this.store.getCompactPriority(); -481 if (this.queuedPriority > oldPriority) { -482 // Store priority decreased while we were in queue (due to some other compaction?), -483 // requeue with new priority to avoid blocking potential higher priorities. -484 this.parent.execute(this); -485 return; -486 } -487 Optional<CompactionContext> selected; -488 try { -489 selected = selectCompaction(this.region, this.store, queuedPriority, -490 CompactionLifeCycleTracker.DUMMY, user); -491 } catch (IOException ex) { -492 LOG.error("Compaction selection failed " + this, ex); -493 server.checkFileSystem(); -494 region.decrementCompactionsQueuedCount(); -495 return; -496 } -497 if (!selected.isPresent()) { -498 region.decrementCompactionsQueuedCount(); -499 return; // nothing to do -500 } -501 c = selected.get(); -502 assert c.hasSelection(); -503 // Now see if we are in correct pool for the size; if not, go to the correct one. -504 // We might end up waiting for a while, so cancel the selection. -505 -506 ThreadPoolExecutor pool = -507 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; -508 -509 // Long compaction pool can process small job -510 // Short compaction pool should not process large job -511 if (this.parent == shortCompactions && pool == longCompactions) { -512 this.store.cancelRequestedCompaction(c); -513 this.parent = pool; -514 this.parent.execute(this); -515 retu