Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 79916200C6C for ; Fri, 5 May 2017 16:58:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 78305160BD0; Fri, 5 May 2017 14:58:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AA093160BC9 for ; Fri, 5 May 2017 16:58:40 +0200 (CEST) Received: (qmail 26599 invoked by uid 500); 5 May 2017 14:58:38 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 26393 invoked by uid 99); 5 May 2017 14:58:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 May 2017 14:58:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7C059F1596; Fri, 5 May 2017 14:58:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Fri, 05 May 2017 14:58:47 -0000 Message-Id: In-Reply-To: <0e98c233fcb34a92ba78cadc5679d595@git.apache.org> References: <0e98c233fcb34a92ba78cadc5679d595@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/43] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Fri, 05 May 2017 14:58:42 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/e2c783b2/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplitThread.CompactionRunner.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplitThread.CompactionRunner.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplitThread.CompactionRunner.html index 43cc877..6408c36 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplitThread.CompactionRunner.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/CompactSplitThread.CompactionRunner.html @@ -428,256 +428,259 @@ 420 private int queuedPriority; 421 private ThreadPoolExecutor parent; 422 private User user; -423 -424 public CompactionRunner(Store store, Region region, -425 CompactionContext compaction, ThreadPoolExecutor parent, User user) { -426 super(); -427 this.store = store; -428 this.region = (HRegion)region; -429 this.compaction = compaction; -430 this.queuedPriority = (this.compaction == null) -431 ? store.getCompactPriority() : compaction.getRequest().getPriority(); -432 this.parent = parent; -433 this.user = user; -434 } -435 -436 @Override -437 public String toString() { -438 return (this.compaction != null) ? ("Request = " + compaction.getRequest()) -439 : ("Store = " + store.toString() + ", pri = " + queuedPriority); -440 } -441 -442 private void doCompaction(User user) { -443 // Common case - system compaction without a file selection. Select now. -444 if (this.compaction == null) { -445 int oldPriority = this.queuedPriority; -446 this.queuedPriority = this.store.getCompactPriority(); -447 if (this.queuedPriority > oldPriority) { -448 // Store priority decreased while we were in queue (due to some other compaction?), -449 // requeue with new priority to avoid blocking potential higher priorities. -450 this.parent.execute(this); -451 return; -452 } -453 try { -454 this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); -455 } catch (IOException ex) { -456 LOG.error("Compaction selection failed " + this, ex); -457 server.checkFileSystem(); -458 return; -459 } -460 if (this.compaction == null) return; // nothing to do -461 // Now see if we are in correct pool for the size; if not, go to the correct one. -462 // We might end up waiting for a while, so cancel the selection. -463 assert this.compaction.hasSelection(); -464 ThreadPoolExecutor pool = store.throttleCompaction( -465 compaction.getRequest().getSize()) ? longCompactions : shortCompactions; -466 -467 // Long compaction pool can process small job -468 // Short compaction pool should not process large job -469 if (this.parent == shortCompactions && pool == longCompactions) { -470 this.store.cancelRequestedCompaction(this.compaction); -471 this.compaction = null; -472 this.parent = pool; -473 this.parent.execute(this); -474 return; -475 } -476 } -477 // Finally we can compact something. -478 assert this.compaction != null; -479 -480 this.compaction.getRequest().beforeExecute(); -481 try { -482 // Note: please don't put single-compaction logic here; -483 // put it into region/store/etc. This is CST logic. -484 long start = EnvironmentEdgeManager.currentTime(); -485 boolean completed = -486 region.compact(compaction, store, compactionThroughputController, user); -487 long now = EnvironmentEdgeManager.currentTime(); -488 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + -489 this + "; duration=" + StringUtils.formatTimeDiff(now, start)); -490 if (completed) { -491 // degenerate case: blocked regions require recursive enqueues -492 if (store.getCompactPriority() <= 0) { -493 requestSystemCompaction(region, store, "Recursive enqueue"); -494 } else { -495 // see if the compaction has caused us to exceed max region size -496 requestSplit(region); -497 } -498 } -499 } catch (IOException ex) { -500 IOException remoteEx = -501 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; -502 LOG.error("Compaction failed " + this, remoteEx); -503 if (remoteEx != ex) { -504 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); -505 } -506 region.reportCompactionRequestFailure(); -507 server.checkFileSystem(); -508 } catch (Exception ex) { -509 LOG.error("Compaction failed " + this, ex); -510 region.reportCompactionRequestFailure(); -511 server.checkFileSystem(); -512 } finally { -513 LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this); -514 } -515 this.compaction.getRequest().afterExecute(); -516 } -517 -518 @Override -519 public void run() { -520 Preconditions.checkNotNull(server); -521 if (server.isStopped() -522 || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { -523 return; -524 } -525 doCompaction(user); -526 } -527 -528 private String formatStackTrace(Exception ex) { -529 StringWriter sw = new StringWriter(); -530 PrintWriter pw = new PrintWriter(sw); -531 ex.printStackTrace(pw); -532 pw.flush(); -533 return sw.toString(); -534 } -535 -536 @Override -537 public int compareTo(CompactionRunner o) { -538 // Only compare the underlying request (if any), for queue sorting purposes. -539 int compareVal = queuedPriority - o.queuedPriority; // compare priority -540 if (compareVal != 0) return compareVal; -541 CompactionContext tc = this.compaction, oc = o.compaction; -542 // Sort pre-selected (user?) compactions before system ones with equal priority. -543 return (tc == null) ? ((oc == null) ? 0 : 1) -544 : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); -545 } -546 } -547 -548 /** -549 * Cleanup class to use when rejecting a compaction request from the queue. -550 */ -551 private static class Rejection implements RejectedExecutionHandler { -552 @Override -553 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { -554 if (runnable instanceof CompactionRunner) { -555 CompactionRunner runner = (CompactionRunner)runnable; -556 LOG.debug("Compaction Rejected: " + runner); -557 runner.store.cancelRequestedCompaction(runner.compaction); -558 } -559 } -560 } -561 -562 /** -563 * {@inheritDoc} -564 */ -565 @Override -566 public void onConfigurationChange(Configuration newConf) { -567 // Check if number of large / small compaction threads has changed, and then -568 // adjust the core pool size of the thread pools, by using the -569 // setCorePoolSize() method. According to the javadocs, it is safe to -570 // change the core pool size on-the-fly. We need to reset the maximum -571 // pool size, as well. -572 int largeThreads = Math.max(1, newConf.getInt( -573 LARGE_COMPACTION_THREADS, -574 LARGE_COMPACTION_THREADS_DEFAULT)); -575 if (this.longCompactions.getCorePoolSize() != largeThreads) { -576 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + -577 " from " + this.longCompactions.getCorePoolSize() + " to " + -578 largeThreads); -579 if(this.longCompactions.getCorePoolSize() < largeThreads) { -580 this.longCompactions.setMaximumPoolSize(largeThreads); -581 this.longCompactions.setCorePoolSize(largeThreads); -582 } else { -583 this.longCompactions.setCorePoolSize(largeThreads); -584 this.longCompactions.setMaximumPoolSize(largeThreads); -585 } -586 } -587 -588 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, -589 SMALL_COMPACTION_THREADS_DEFAULT); -590 if (this.shortCompactions.getCorePoolSize() != smallThreads) { -591 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + -592 " from " + this.shortCompactions.getCorePoolSize() + " to " + -593 smallThreads); -594 if(this.shortCompactions.getCorePoolSize() < smallThreads) { -595 this.shortCompactions.setMaximumPoolSize(smallThreads); -596 this.shortCompactions.setCorePoolSize(smallThreads); -597 } else { -598 this.shortCompactions.setCorePoolSize(smallThreads); -599 this.shortCompactions.setMaximumPoolSize(smallThreads); -600 } -601 } -602 -603 int splitThreads = newConf.getInt(SPLIT_THREADS, -604 SPLIT_THREADS_DEFAULT); -605 if (this.splits.getCorePoolSize() != splitThreads) { -606 LOG.info("Changing the value of " + SPLIT_THREADS + -607 " from " + this.splits.getCorePoolSize() + " to " + -608 splitThreads); -609 if(this.splits.getCorePoolSize() < splitThreads) { -610 this.splits.setMaximumPoolSize(splitThreads); -611 this.splits.setCorePoolSize(splitThreads); -612 } else { -613 this.splits.setCorePoolSize(splitThreads); -614 this.splits.setMaximumPoolSize(splitThreads); -615 } -616 } -617 -618 ThroughputController old = this.compactionThroughputController; -619 if (old != null) { -620 old.stop("configuration change"); -621 } -622 this.compactionThroughputController = -623 CompactionThroughputControllerFactory.create(server, newConf); -624 -625 // We change this atomically here instead of reloading the config in order that upstream -626 // would be the only one with the flexibility to reload the config. -627 this.conf.reloadConfiguration(); -628 } -629 -630 protected int getSmallCompactionThreadNum() { -631 return this.shortCompactions.getCorePoolSize(); -632 } -633 -634 protected int getLargeCompactionThreadNum() { -635 return this.longCompactions.getCorePoolSize(); -636 } -637 -638 protected int getSplitThreadNum() { -639 return this.splits.getCorePoolSize(); -640 } -641 -642 /** -643 * {@inheritDoc} -644 */ -645 @Override -646 public void registerChildren(ConfigurationManager manager) { -647 // No children to register. -648 } -649 -650 /** -651 * {@inheritDoc} -652 */ -653 @Override -654 public void deregisterChildren(ConfigurationManager manager) { -655 // No children to register -656 } -657 -658 @VisibleForTesting -659 public ThroughputController getCompactionThroughputController() { -660 return compactionThroughputController; -661 } -662 -663 @VisibleForTesting -664 /** -665 * Shutdown the long compaction thread pool. -666 * Should only be used in unit test to prevent long compaction thread pool from stealing job -667 * from short compaction queue -668 */ -669 void shutdownLongCompactions(){ -670 this.longCompactions.shutdown(); -671 } -672} +423 private long time; +424 +425 public CompactionRunner(Store store, Region region, +426 CompactionContext compaction, ThreadPoolExecutor parent, User user) { +427 super(); +428 this.store = store; +429 this.region = (HRegion)region; +430 this.compaction = compaction; +431 this.queuedPriority = (this.compaction == null) +432 ? store.getCompactPriority() : compaction.getRequest().getPriority(); +433 this.parent = parent; +434 this.user = user; +435 this.time = System.currentTimeMillis(); +436 } +437 +438 @Override +439 public String toString() { +440 return (this.compaction != null) ? ("Request = " + compaction.getRequest()) +441 : ("regionName = " + region.toString() + ", storeName = " + store.toString() + +442 ", priority = " + queuedPriority + ", time = " + time); +443 } +444 +445 private void doCompaction(User user) { +446 // Common case - system compaction without a file selection. Select now. +447 if (this.compaction == null) { +448 int oldPriority = this.queuedPriority; +449 this.queuedPriority = this.store.getCompactPriority(); +450 if (this.queuedPriority > oldPriority) { +451 // Store priority decreased while we were in queue (due to some other compaction?), +452 // requeue with new priority to avoid blocking potential higher priorities. +453 this.parent.execute(this); +454 return; +455 } +456 try { +457 this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); +458 } catch (IOException ex) { +459 LOG.error("Compaction selection failed " + this, ex); +460 server.checkFileSystem(); +461 return; +462 } +463 if (this.compaction == null) return; // nothing to do +464 // Now see if we are in correct pool for the size; if not, go to the correct one. +465 // We might end up waiting for a while, so cancel the selection. +466 assert this.compaction.hasSelection(); +467 ThreadPoolExecutor pool = store.throttleCompaction( +468 compaction.getRequest().getSize()) ? longCompactions : shortCompactions; +469 +470 // Long compaction pool can process small job +471 // Short compaction pool should not process large job +472 if (this.parent == shortCompactions && pool == longCompactions) { +473 this.store.cancelRequestedCompaction(this.compaction); +474 this.compaction = null; +475 this.parent = pool; +476 this.parent.execute(this); +477 return; +478 } +479 } +480 // Finally we can compact something. +481 assert this.compaction != null; +482 +483 this.compaction.getRequest().beforeExecute(); +484 try { +485 // Note: please don't put single-compaction logic here; +486 // put it into region/store/etc. This is CST logic. +487 long start = EnvironmentEdgeManager.currentTime(); +488 boolean completed = +489 region.compact(compaction, store, compactionThroughputController, user); +490 long now = EnvironmentEdgeManager.currentTime(); +491 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + +492 this + "; duration=" + StringUtils.formatTimeDiff(now, start)); +493 if (completed) { +494 // degenerate case: blocked regions require recursive enqueues +495 if (store.getCompactPriority() <= 0) { +496 requestSystemCompaction(region, store, "Recursive enqueue"); +497 } else { +498 // see if the compaction has caused us to exceed max region size +499 requestSplit(region); +500 } +501 } +502 } catch (IOException ex) { +503 IOException remoteEx = +504 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; +505 LOG.error("Compaction failed " + this, remoteEx); +506 if (remoteEx != ex) { +507 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); +508 } +509 region.reportCompactionRequestFailure(); +510 server.checkFileSystem(); +511 } catch (Exception ex) { +512 LOG.error("Compaction failed " + this, ex); +513 region.reportCompactionRequestFailure(); +514 server.checkFileSystem(); +515 } finally { +516 LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this); +517 } +518 this.compaction.getRequest().afterExecute(); +519 } +520 +521 @Override +522 public void run() { +523 Preconditions.checkNotNull(server); +524 if (server.isStopped() +525 || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { +526 return; +527 } +528 doCompaction(user); +529 } +530 +531 private String formatStackTrace(Exception ex) { +532 StringWriter sw = new StringWriter(); +533 PrintWriter pw = new PrintWriter(sw); +534 ex.printStackTrace(pw); +535 pw.flush(); +536 return sw.toString(); +537 } +538 +539 @Override +540 public int compareTo(CompactionRunner o) { +541 // Only compare the underlying request (if any), for queue sorting purposes. +542 int compareVal = queuedPriority - o.queuedPriority; // compare priority +543 if (compareVal != 0) return compareVal; +544 CompactionContext tc = this.compaction, oc = o.compaction; +545 // Sort pre-selected (user?) compactions before system ones with equal priority. +546 return (tc == null) ? ((oc == null) ? 0 : 1) +547 : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); +548 } +549 } +550 +551 /** +552 * Cleanup class to use when rejecting a compaction request from the queue. +553 */ +554 private static class Rejection implements RejectedExecutionHandler { +555 @Override +556 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { +557 if (runnable instanceof CompactionRunner) { +558 CompactionRunner runner = (CompactionRunner)runnable; +559 LOG.debug("Compaction Rejected: " + runner); +560 runner.store.cancelRequestedCompaction(runner.compaction); +561 } +562 } +563 } +564 +565 /** +566 * {@inheritDoc} +567 */ +568 @Override +569 public void onConfigurationChange(Configuration newConf) { +570 // Check if number of large / small compaction threads has changed, and then +571 // adjust the core pool size of the thread pools, by using the +572 // setCorePoolSize() method. According to the javadocs, it is safe to +573 // change the core pool size on-the-fly. We need to reset the maximum +574 // pool size, as well. +575 int largeThreads = Math.max(1, newConf.getInt( +576 LARGE_COMPACTION_THREADS, +577 LARGE_COMPACTION_THREADS_DEFAULT)); +578 if (this.longCompactions.getCorePoolSize() != largeThreads) { +579 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + +580 " from " + this.longCompactions.getCorePoolSize() + " to " + +581 largeThreads); +582 if(this.longCompactions.getCorePoolSize() < largeThreads) { +583 this.longCompactions.setMaximumPoolSize(largeThreads); +584 this.longCompactions.setCorePoolSize(largeThreads); +585 } else { +586 this.longCompactions.setCorePoolSize(largeThreads); +587 this.longCompactions.setMaximumPoolSize(largeThreads); +588 } +589 } +590 +591 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, +592 SMALL_COMPACTION_THREADS_DEFAULT); +593 if (this.shortCompactions.getCorePoolSize() != smallThreads) { +594 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + +595 " from " + this.shortCompactions.getCorePoolSize() + " to " + +596 smallThreads); +597 if(this.shortCompactions.getCorePoolSize() < smallThreads) { +598 this.shortCompactions.setMaximumPoolSize(smallThreads); +599 this.shortCompactions.setCorePoolSize(smallThreads); +600 } else { +601 this.shortCompactions.setCorePoolSize(smallThreads); +602 this.shortCompactions.setMaximumPoolSize(smallThreads); +603 } +604 } +605 +606 int splitThreads = newConf.getInt(SPLIT_THREADS, +607 SPLIT_THREADS_DEFAULT); +608 if (this.splits.getCorePoolSize() != splitThreads) { +609 LOG.info("Changing the value of " + SPLIT_THREADS + +610 " from " + this.splits.getCorePoolSize() + " to " + +611 splitThreads); +612 if(this.splits.getCorePoolSize() < splitThreads) { +613 this.splits.setMaximumPoolSize(splitThreads); +614 this.splits.setCorePoolSize(splitThreads); +615 } else { +616 this.splits.setCorePoolSize(splitThreads); +617 this.splits.setMaximumPoolSize(splitThreads); +618 } +619 } +620 +621 ThroughputController old = this.compactionThroughputController; +622 if (old != null) { +623 old.stop("configuration change"); +624 } +625 this.compactionThroughputController = +626 CompactionThroughputControllerFactory.create(server, newConf); +627 +628 // We change this atomically here instead of reloading the config in order that upstream +629 // would be the only one with the flexibility to reload the config. +630 this.conf.reloadConfiguration(); +631 } +632 +633 protected int getSmallCompactionThreadNum() { +634 return this.shortCompactions.getCorePoolSize(); +635 } +636 +637 protected int getLargeCompactionThreadNum() { +638 return this.longCompactions.getCorePoolSize(); +639 } +640 +641 protected int getSplitThreadNum() { +642 return this.splits.getCorePoolSize(); +643 } +644 +645 /** +646 * {@inheritDoc} +647 */ +648 @Override +649 public void registerChildren(ConfigurationManager manager) { +650 // No children to register. +651 } +652 +653 /** +654 * {@inheritDoc} +655 */ +656 @Override +657 public void deregisterChildren(ConfigurationManager manager) { +658 // No children to register +659 } +660 +661 @VisibleForTesting +662 public ThroughputController getCompactionThroughputController() { +663 return compactionThroughputController; +664 } +665 +666 @VisibleForTesting +667 /** +668 * Shutdown the long compaction thread pool. +669 * Should only be used in unit test to prevent long compaction thread pool from stealing job +670 * from short compaction queue +671 */ +672 void shutdownLongCompactions(){ +673 this.longCompactions.shutdown(); +674 } +675}