Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3891E200CBB for ; Sat, 27 May 2017 02:12:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3703F160BE0; Sat, 27 May 2017 00:12:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C632B160BC8 for ; Sat, 27 May 2017 02:12:37 +0200 (CEST) Received: (qmail 41572 invoked by uid 500); 27 May 2017 00:12:35 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 40458 invoked by uid 99); 27 May 2017 00:12:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 May 2017 00:12:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 851F9F2184; Sat, 27 May 2017 00:12:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Sat, 27 May 2017 00:12:44 -0000 Message-Id: <6a8d0e564bdb46eb8a860bbd86561efb@git.apache.org> In-Reply-To: <303f0926e111442eabb942b126f0b353@git.apache.org> References: <303f0926e111442eabb942b126f0b353@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/30] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility. archived-at: Sat, 27 May 2017 00:12:40 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java new file mode 100644 index 0000000..e7157d0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -0,0 +1,723 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.StealJobQueue; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * Compact region on request and then run split if appropriate + */ +@InterfaceAudience.Private +public class CompactSplit implements CompactionRequestor, PropagatingConfigurationObserver { + private static final Log LOG = LogFactory.getLog(CompactSplit.class); + + // Configuration key for the large compaction threads. + public final static String LARGE_COMPACTION_THREADS = + "hbase.regionserver.thread.compaction.large"; + public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; + + // Configuration key for the small compaction threads. + public final static String SMALL_COMPACTION_THREADS = + "hbase.regionserver.thread.compaction.small"; + public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; + + // Configuration key for split threads + public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; + public final static int SPLIT_THREADS_DEFAULT = 1; + + // Configuration keys for merge threads + public final static String MERGE_THREADS = "hbase.regionserver.thread.merge"; + public final static int MERGE_THREADS_DEFAULT = 1; + + public static final String REGION_SERVER_REGION_SPLIT_LIMIT = + "hbase.regionserver.regionSplitLimit"; + public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; + + private final HRegionServer server; + private final Configuration conf; + + private final ThreadPoolExecutor longCompactions; + private final ThreadPoolExecutor shortCompactions; + private final ThreadPoolExecutor splits; + private final ThreadPoolExecutor mergePool; + + private volatile ThroughputController compactionThroughputController; + + /** + * Splitting should not take place if the total number of regions exceed this. + * This is not a hard limit to the number of regions but it is a guideline to + * stop splitting after number of online regions is greater than this. + */ + private int regionSplitLimit; + + /** @param server */ + CompactSplit(HRegionServer server) { + super(); + this.server = server; + this.conf = server.getConfiguration(); + this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, + DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); + + int largeThreads = Math.max(1, conf.getInt( + LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); + int smallThreads = conf.getInt( + SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); + + int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); + + // if we have throttle threads, make sure the user also specified size + Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); + + final String n = Thread.currentThread().getName(); + + StealJobQueue stealJobQueue = new StealJobQueue<>(); + this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, + 60, TimeUnit.SECONDS, stealJobQueue, + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-longCompactions-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + this.longCompactions.setRejectedExecutionHandler(new Rejection()); + this.longCompactions.prestartAllCoreThreads(); + this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, + 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-shortCompactions-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + this.shortCompactions + .setRejectedExecutionHandler(new Rejection()); + this.splits = (ThreadPoolExecutor) + Executors.newFixedThreadPool(splitThreads, + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-splits-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT); + this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool( + mergeThreads, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-merges-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + + // compaction throughput controller + this.compactionThroughputController = + CompactionThroughputControllerFactory.create(server, conf); + } + + @Override + public String toString() { + return "compaction_queue=(" + + longCompactions.getQueue().size() + ":" + + shortCompactions.getQueue().size() + ")" + + ", split_queue=" + splits.getQueue().size(); + } + + public String dumpQueue() { + StringBuffer queueLists = new StringBuffer(); + queueLists.append("Compaction/Split Queue dump:\n"); + queueLists.append(" LargeCompation Queue:\n"); + BlockingQueue lq = longCompactions.getQueue(); + Iterator it = lq.iterator(); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); + queueLists.append("\n"); + } + + if (shortCompactions != null) { + queueLists.append("\n"); + queueLists.append(" SmallCompation Queue:\n"); + lq = shortCompactions.getQueue(); + it = lq.iterator(); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); + queueLists.append("\n"); + } + } + + queueLists.append("\n"); + queueLists.append(" Split Queue:\n"); + lq = splits.getQueue(); + it = lq.iterator(); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); + queueLists.append("\n"); + } + + return queueLists.toString(); + } + + public synchronized void requestRegionsMerge(final Region a, + final Region b, final boolean forcible, long masterSystemTime, User user) { + try { + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user)); + if (LOG.isDebugEnabled()) { + LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + + forcible + ". " + this); + } + } catch (RejectedExecutionException ree) { + LOG.warn("Could not execute merge for " + a + "," + b + ", forcible=" + + forcible, ree); + } + } + + public synchronized boolean requestSplit(final Region r) { + // don't split regions that are blocking + if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) { + byte[] midKey = ((HRegion)r).checkSplit(); + if (midKey != null) { + requestSplit(r, midKey); + return true; + } + } + return false; + } + + public synchronized void requestSplit(final Region r, byte[] midKey) { + requestSplit(r, midKey, null); + } + + /* + * The User parameter allows the split thread to assume the correct user identity + */ + public synchronized void requestSplit(final Region r, byte[] midKey, User user) { + if (midKey == null) { + LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + + " not splittable because midkey=null"); + if (((HRegion)r).shouldForceSplit()) { + ((HRegion)r).clearSplit(); + } + return; + } + try { + this.splits.execute(new SplitRequest(r, midKey, this.server, user)); + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting " + r + ", " + this); + } + } catch (RejectedExecutionException ree) { + LOG.info("Could not execute split for " + r, ree); + } + } + + @Override + public synchronized List requestCompaction(final Region r, final String why) + throws IOException { + return requestCompaction(r, why, null); + } + + @Override + public synchronized List requestCompaction(final Region r, final String why, + List> requests) throws IOException { + return requestCompaction(r, why, Store.NO_PRIORITY, requests, null); + } + + @Override + public synchronized CompactionRequest requestCompaction(final Region r, final Store s, + final String why, CompactionRequest request) throws IOException { + return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null); + } + + @Override + public synchronized List requestCompaction(final Region r, final String why, + int p, List> requests, User user) throws IOException { + return requestCompactionInternal(r, why, p, requests, true, user); + } + + private List requestCompactionInternal(final Region r, final String why, + int p, List> requests, boolean selectNow, User user) + throws IOException { + // not a special compaction request, so make our own list + List ret = null; + if (requests == null) { + ret = selectNow ? new ArrayList(r.getStores().size()) : null; + for (Store s : r.getStores()) { + CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user); + if (selectNow) ret.add(cr); + } + } else { + Preconditions.checkArgument(selectNow); // only system requests have selectNow == false + ret = new ArrayList(requests.size()); + for (Pair pair : requests) { + ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user)); + } + } + return ret; + } + + public CompactionRequest requestCompaction(final Region r, final Store s, + final String why, int priority, CompactionRequest request, User user) throws IOException { + return requestCompactionInternal(r, s, why, priority, request, true, user); + } + + public synchronized void requestSystemCompaction( + final Region r, final String why) throws IOException { + requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null); + } + + public void requestSystemCompaction( + final Region r, final Store s, final String why) throws IOException { + requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null); + } + + /** + * @param r region store belongs to + * @param s Store to request compaction on + * @param why Why compaction requested -- used in debug messages + * @param priority override the default priority (NO_PRIORITY == decide) + * @param request custom compaction request. Can be null in which case a simple + * compaction will be used. + */ + private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s, + final String why, int priority, CompactionRequest request, boolean selectNow, User user) + throws IOException { + if (this.server.isStopped() + || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) { + return null; + } + + CompactionContext compaction = null; + if (selectNow) { + compaction = selectCompaction(r, s, priority, request, user); + if (compaction == null) return null; // message logged inside + } + + final RegionServerSpaceQuotaManager spaceQuotaManager = + this.server.getRegionServerSpaceQuotaManager(); + if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled( + r.getTableDesc().getTableName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation " + + " policy disallows compactions."); + } + return null; + } + + // We assume that most compactions are small. So, put system compactions into small + // pool; we will do selection there, and move to large pool if necessary. + ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) + ? longCompactions : shortCompactions; + pool.execute(new CompactionRunner(s, r, compaction, pool, user)); + if (LOG.isDebugEnabled()) { + String type = (pool == shortCompactions) ? "Small " : "Large "; + LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") + + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); + } + return selectNow ? compaction.getRequest() : null; + } + + private CompactionContext selectCompaction(final Region r, final Store s, + int priority, CompactionRequest request, User user) throws IOException { + CompactionContext compaction = s.requestCompaction(priority, request, user); + if (compaction == null) { + if(LOG.isDebugEnabled() && r.getRegionInfo() != null) { + LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + + " because compaction request was cancelled"); + } + return null; + } + assert compaction.hasSelection(); + if (priority != Store.NO_PRIORITY) { + compaction.getRequest().setPriority(priority); + } + return compaction; + } + + /** + * Only interrupt once it's done with a run through the work loop. + */ + void interruptIfNecessary() { + splits.shutdown(); + longCompactions.shutdown(); + shortCompactions.shutdown(); + } + + private void waitFor(ThreadPoolExecutor t, String name) { + boolean done = false; + while (!done) { + try { + done = t.awaitTermination(60, TimeUnit.SECONDS); + LOG.info("Waiting for " + name + " to finish..."); + if (!done) { + t.shutdownNow(); + } + } catch (InterruptedException ie) { + LOG.warn("Interrupted waiting for " + name + " to finish..."); + } + } + } + + void join() { + waitFor(splits, "Split Thread"); + waitFor(longCompactions, "Large Compaction Thread"); + waitFor(shortCompactions, "Small Compaction Thread"); + } + + /** + * Returns the current size of the queue containing regions that are + * processed. + * + * @return The current size of the regions queue. + */ + public int getCompactionQueueSize() { + return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); + } + + public int getLargeCompactionQueueSize() { + return longCompactions.getQueue().size(); + } + + + public int getSmallCompactionQueueSize() { + return shortCompactions.getQueue().size(); + } + + public int getSplitQueueSize() { + return splits.getQueue().size(); + } + + private boolean shouldSplitRegion() { + if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { + LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " + + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); + } + return (regionSplitLimit > server.getNumberOfOnlineRegions()); + } + + /** + * @return the regionSplitLimit + */ + public int getRegionSplitLimit() { + return this.regionSplitLimit; + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", + justification="Contrived use of compareTo") + private class CompactionRunner implements Runnable, Comparable { + private final Store store; + private final HRegion region; + private CompactionContext compaction; + private int queuedPriority; + private ThreadPoolExecutor parent; + private User user; + private long time; + + public CompactionRunner(Store store, Region region, + CompactionContext compaction, ThreadPoolExecutor parent, User user) { + super(); + this.store = store; + this.region = (HRegion)region; + this.compaction = compaction; + this.queuedPriority = (this.compaction == null) + ? store.getCompactPriority() : compaction.getRequest().getPriority(); + this.parent = parent; + this.user = user; + this.time = System.currentTimeMillis(); + } + + @Override + public String toString() { + return (this.compaction != null) ? ("Request = " + compaction.getRequest()) + : ("regionName = " + region.toString() + ", storeName = " + store.toString() + + ", priority = " + queuedPriority + ", time = " + time); + } + + private void doCompaction(User user) { + // Common case - system compaction without a file selection. Select now. + if (this.compaction == null) { + int oldPriority = this.queuedPriority; + this.queuedPriority = this.store.getCompactPriority(); + if (this.queuedPriority > oldPriority) { + // Store priority decreased while we were in queue (due to some other compaction?), + // requeue with new priority to avoid blocking potential higher priorities. + this.parent.execute(this); + return; + } + try { + this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); + } catch (IOException ex) { + LOG.error("Compaction selection failed " + this, ex); + server.checkFileSystem(); + return; + } + if (this.compaction == null) return; // nothing to do + // Now see if we are in correct pool for the size; if not, go to the correct one. + // We might end up waiting for a while, so cancel the selection. + assert this.compaction.hasSelection(); + ThreadPoolExecutor pool = store.throttleCompaction( + compaction.getRequest().getSize()) ? longCompactions : shortCompactions; + + // Long compaction pool can process small job + // Short compaction pool should not process large job + if (this.parent == shortCompactions && pool == longCompactions) { + this.store.cancelRequestedCompaction(this.compaction); + this.compaction = null; + this.parent = pool; + this.parent.execute(this); + return; + } + } + // Finally we can compact something. + assert this.compaction != null; + + this.compaction.getRequest().beforeExecute(); + try { + // Note: please don't put single-compaction logic here; + // put it into region/store/etc. This is CST logic. + long start = EnvironmentEdgeManager.currentTime(); + boolean completed = + region.compact(compaction, store, compactionThroughputController, user); + long now = EnvironmentEdgeManager.currentTime(); + LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); + if (completed) { + // degenerate case: blocked regions require recursive enqueues + if (store.getCompactPriority() <= 0) { + requestSystemCompaction(region, store, "Recursive enqueue"); + } else { + // see if the compaction has caused us to exceed max region size + requestSplit(region); + } + } + } catch (IOException ex) { + IOException remoteEx = + ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; + LOG.error("Compaction failed " + this, remoteEx); + if (remoteEx != ex) { + LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); + } + region.reportCompactionRequestFailure(); + server.checkFileSystem(); + } catch (Exception ex) { + LOG.error("Compaction failed " + this, ex); + region.reportCompactionRequestFailure(); + server.checkFileSystem(); + } finally { + LOG.debug("CompactSplitThread Status: " + CompactSplit.this); + } + this.compaction.getRequest().afterExecute(); + } + + @Override + public void run() { + Preconditions.checkNotNull(server); + if (server.isStopped() + || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { + return; + } + doCompaction(user); + } + + private String formatStackTrace(Exception ex) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + return sw.toString(); + } + + @Override + public int compareTo(CompactionRunner o) { + // Only compare the underlying request (if any), for queue sorting purposes. + int compareVal = queuedPriority - o.queuedPriority; // compare priority + if (compareVal != 0) return compareVal; + CompactionContext tc = this.compaction, oc = o.compaction; + // Sort pre-selected (user?) compactions before system ones with equal priority. + return (tc == null) ? ((oc == null) ? 0 : 1) + : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); + } + } + + /** + * Cleanup class to use when rejecting a compaction request from the queue. + */ + private static class Rejection implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { + if (runnable instanceof CompactionRunner) { + CompactionRunner runner = (CompactionRunner)runnable; + LOG.debug("Compaction Rejected: " + runner); + runner.store.cancelRequestedCompaction(runner.compaction); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void onConfigurationChange(Configuration newConf) { + // Check if number of large / small compaction threads has changed, and then + // adjust the core pool size of the thread pools, by using the + // setCorePoolSize() method. According to the javadocs, it is safe to + // change the core pool size on-the-fly. We need to reset the maximum + // pool size, as well. + int largeThreads = Math.max(1, newConf.getInt( + LARGE_COMPACTION_THREADS, + LARGE_COMPACTION_THREADS_DEFAULT)); + if (this.longCompactions.getCorePoolSize() != largeThreads) { + LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + + " from " + this.longCompactions.getCorePoolSize() + " to " + + largeThreads); + if(this.longCompactions.getCorePoolSize() < largeThreads) { + this.longCompactions.setMaximumPoolSize(largeThreads); + this.longCompactions.setCorePoolSize(largeThreads); + } else { + this.longCompactions.setCorePoolSize(largeThreads); + this.longCompactions.setMaximumPoolSize(largeThreads); + } + } + + int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, + SMALL_COMPACTION_THREADS_DEFAULT); + if (this.shortCompactions.getCorePoolSize() != smallThreads) { + LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + + " from " + this.shortCompactions.getCorePoolSize() + " to " + + smallThreads); + if(this.shortCompactions.getCorePoolSize() < smallThreads) { + this.shortCompactions.setMaximumPoolSize(smallThreads); + this.shortCompactions.setCorePoolSize(smallThreads); + } else { + this.shortCompactions.setCorePoolSize(smallThreads); + this.shortCompactions.setMaximumPoolSize(smallThreads); + } + } + + int splitThreads = newConf.getInt(SPLIT_THREADS, + SPLIT_THREADS_DEFAULT); + if (this.splits.getCorePoolSize() != splitThreads) { + LOG.info("Changing the value of " + SPLIT_THREADS + + " from " + this.splits.getCorePoolSize() + " to " + + splitThreads); + if(this.splits.getCorePoolSize() < splitThreads) { + this.splits.setMaximumPoolSize(splitThreads); + this.splits.setCorePoolSize(splitThreads); + } else { + this.splits.setCorePoolSize(splitThreads); + this.splits.setMaximumPoolSize(splitThreads); + } + } + + ThroughputController old = this.compactionThroughputController; + if (old != null) { + old.stop("configuration change"); + } + this.compactionThroughputController = + CompactionThroughputControllerFactory.create(server, newConf); + + // We change this atomically here instead of reloading the config in order that upstream + // would be the only one with the flexibility to reload the config. + this.conf.reloadConfiguration(); + } + + protected int getSmallCompactionThreadNum() { + return this.shortCompactions.getCorePoolSize(); + } + + protected int getLargeCompactionThreadNum() { + return this.longCompactions.getCorePoolSize(); + } + + protected int getSplitThreadNum() { + return this.splits.getCorePoolSize(); + } + + /** + * {@inheritDoc} + */ + @Override + public void registerChildren(ConfigurationManager manager) { + // No children to register. + } + + /** + * {@inheritDoc} + */ + @Override + public void deregisterChildren(ConfigurationManager manager) { + // No children to register + } + + @VisibleForTesting + public ThroughputController getCompactionThroughputController() { + return compactionThroughputController; + } + + @VisibleForTesting + /** + * Shutdown the long compaction thread pool. + * Should only be used in unit test to prevent long compaction thread pool from stealing job + * from short compaction queue + */ + void shutdownLongCompactions(){ + this.longCompactions.shutdown(); + } + + public void clearLongCompactionsQueue() { + longCompactions.getQueue().clear(); + } + + public void clearShortCompactionsQueue() { + shortCompactions.getQueue().clear(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java deleted file mode 100644 index 7791ea7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ /dev/null @@ -1,695 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.conf.ConfigurationManager; -import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; -import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.StealJobQueue; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -/** - * Compact region on request and then run split if appropriate - */ -@InterfaceAudience.Private -public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver { - private static final Log LOG = LogFactory.getLog(CompactSplitThread.class); - - // Configuration key for the large compaction threads. - public final static String LARGE_COMPACTION_THREADS = - "hbase.regionserver.thread.compaction.large"; - public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; - - // Configuration key for the small compaction threads. - public final static String SMALL_COMPACTION_THREADS = - "hbase.regionserver.thread.compaction.small"; - public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; - - // Configuration key for split threads - public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; - public final static int SPLIT_THREADS_DEFAULT = 1; - - public static final String REGION_SERVER_REGION_SPLIT_LIMIT = - "hbase.regionserver.regionSplitLimit"; - public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; - - private final HRegionServer server; - private final Configuration conf; - - private final ThreadPoolExecutor longCompactions; - private final ThreadPoolExecutor shortCompactions; - private final ThreadPoolExecutor splits; - - private volatile ThroughputController compactionThroughputController; - - /** - * Splitting should not take place if the total number of regions exceed this. - * This is not a hard limit to the number of regions but it is a guideline to - * stop splitting after number of online regions is greater than this. - */ - private int regionSplitLimit; - - /** @param server */ - CompactSplitThread(HRegionServer server) { - super(); - this.server = server; - this.conf = server.getConfiguration(); - this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, - DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); - - int largeThreads = Math.max(1, conf.getInt( - LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); - int smallThreads = conf.getInt( - SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); - - int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); - - // if we have throttle threads, make sure the user also specified size - Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); - - final String n = Thread.currentThread().getName(); - - StealJobQueue stealJobQueue = new StealJobQueue<>(); - this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, - 60, TimeUnit.SECONDS, stealJobQueue, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-longCompactions-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - this.longCompactions.setRejectedExecutionHandler(new Rejection()); - this.longCompactions.prestartAllCoreThreads(); - this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, - 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-shortCompactions-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - this.shortCompactions - .setRejectedExecutionHandler(new Rejection()); - this.splits = (ThreadPoolExecutor) - Executors.newFixedThreadPool(splitThreads, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-splits-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - - // compaction throughput controller - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, conf); - } - - @Override - public String toString() { - return "compaction_queue=(" - + longCompactions.getQueue().size() + ":" - + shortCompactions.getQueue().size() + ")" - + ", split_queue=" + splits.getQueue().size(); - } - - public String dumpQueue() { - StringBuffer queueLists = new StringBuffer(); - queueLists.append("Compaction/Split Queue dump:\n"); - queueLists.append(" LargeCompation Queue:\n"); - BlockingQueue lq = longCompactions.getQueue(); - Iterator it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - - if (shortCompactions != null) { - queueLists.append("\n"); - queueLists.append(" SmallCompation Queue:\n"); - lq = shortCompactions.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - } - - queueLists.append("\n"); - queueLists.append(" Split Queue:\n"); - lq = splits.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - - return queueLists.toString(); - } - - public synchronized boolean requestSplit(final Region r) { - // don't split regions that are blocking - if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) { - byte[] midKey = ((HRegion)r).checkSplit(); - if (midKey != null) { - requestSplit(r, midKey); - return true; - } - } - return false; - } - - public synchronized void requestSplit(final Region r, byte[] midKey) { - requestSplit(r, midKey, null); - } - - /* - * The User parameter allows the split thread to assume the correct user identity - */ - public synchronized void requestSplit(final Region r, byte[] midKey, User user) { - if (midKey == null) { - LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + - " not splittable because midkey=null"); - if (((HRegion)r).shouldForceSplit()) { - ((HRegion)r).clearSplit(); - } - return; - } - try { - this.splits.execute(new SplitRequest(r, midKey, this.server, user)); - if (LOG.isDebugEnabled()) { - LOG.debug("Split requested for " + r + ". " + this); - } - } catch (RejectedExecutionException ree) { - LOG.info("Could not execute split for " + r, ree); - } - } - - @Override - public synchronized List requestCompaction(final Region r, final String why) - throws IOException { - return requestCompaction(r, why, null); - } - - @Override - public synchronized List requestCompaction(final Region r, final String why, - List> requests) throws IOException { - return requestCompaction(r, why, Store.NO_PRIORITY, requests, null); - } - - @Override - public synchronized CompactionRequest requestCompaction(final Region r, final Store s, - final String why, CompactionRequest request) throws IOException { - return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null); - } - - @Override - public synchronized List requestCompaction(final Region r, final String why, - int p, List> requests, User user) throws IOException { - return requestCompactionInternal(r, why, p, requests, true, user); - } - - private List requestCompactionInternal(final Region r, final String why, - int p, List> requests, boolean selectNow, User user) - throws IOException { - // not a special compaction request, so make our own list - List ret = null; - if (requests == null) { - ret = selectNow ? new ArrayList<>(r.getStores().size()) : null; - for (Store s : r.getStores()) { - CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user); - if (selectNow) ret.add(cr); - } - } else { - Preconditions.checkArgument(selectNow); // only system requests have selectNow == false - ret = new ArrayList<>(requests.size()); - for (Pair pair : requests) { - ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user)); - } - } - return ret; - } - - public CompactionRequest requestCompaction(final Region r, final Store s, - final String why, int priority, CompactionRequest request, User user) throws IOException { - return requestCompactionInternal(r, s, why, priority, request, true, user); - } - - public synchronized void requestSystemCompaction( - final Region r, final String why) throws IOException { - requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null); - } - - public void requestSystemCompaction( - final Region r, final Store s, final String why) throws IOException { - requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null); - } - - /** - * @param r region store belongs to - * @param s Store to request compaction on - * @param why Why compaction requested -- used in debug messages - * @param priority override the default priority (NO_PRIORITY == decide) - * @param request custom compaction request. Can be null in which case a simple - * compaction will be used. - */ - private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s, - final String why, int priority, CompactionRequest request, boolean selectNow, User user) - throws IOException { - if (this.server.isStopped() - || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) { - return null; - } - - CompactionContext compaction = null; - if (selectNow) { - compaction = selectCompaction(r, s, priority, request, user); - if (compaction == null) return null; // message logged inside - } - - final RegionServerSpaceQuotaManager spaceQuotaManager = - this.server.getRegionServerSpaceQuotaManager(); - if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled( - r.getTableDesc().getTableName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation " - + " policy disallows compactions."); - } - return null; - } - - // We assume that most compactions are small. So, put system compactions into small - // pool; we will do selection there, and move to large pool if necessary. - ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) - ? longCompactions : shortCompactions; - pool.execute(new CompactionRunner(s, r, compaction, pool, user)); - if (LOG.isDebugEnabled()) { - String type = (pool == shortCompactions) ? "Small " : "Large "; - LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") - + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); - } - return selectNow ? compaction.getRequest() : null; - } - - private CompactionContext selectCompaction(final Region r, final Store s, - int priority, CompactionRequest request, User user) throws IOException { - CompactionContext compaction = s.requestCompaction(priority, request, user); - if (compaction == null) { - if(LOG.isDebugEnabled() && r.getRegionInfo() != null) { - LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + - " because compaction request was cancelled"); - } - return null; - } - assert compaction.hasSelection(); - if (priority != Store.NO_PRIORITY) { - compaction.getRequest().setPriority(priority); - } - return compaction; - } - - /** - * Only interrupt once it's done with a run through the work loop. - */ - void interruptIfNecessary() { - splits.shutdown(); - longCompactions.shutdown(); - shortCompactions.shutdown(); - } - - private void waitFor(ThreadPoolExecutor t, String name) { - boolean done = false; - while (!done) { - try { - done = t.awaitTermination(60, TimeUnit.SECONDS); - LOG.info("Waiting for " + name + " to finish..."); - if (!done) { - t.shutdownNow(); - } - } catch (InterruptedException ie) { - LOG.warn("Interrupted waiting for " + name + " to finish..."); - } - } - } - - void join() { - waitFor(splits, "Split Thread"); - waitFor(longCompactions, "Large Compaction Thread"); - waitFor(shortCompactions, "Small Compaction Thread"); - } - - /** - * Returns the current size of the queue containing regions that are - * processed. - * - * @return The current size of the regions queue. - */ - public int getCompactionQueueSize() { - return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); - } - - public int getLargeCompactionQueueSize() { - return longCompactions.getQueue().size(); - } - - - public int getSmallCompactionQueueSize() { - return shortCompactions.getQueue().size(); - } - - public int getSplitQueueSize() { - return splits.getQueue().size(); - } - - private boolean shouldSplitRegion() { - if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { - LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " - + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); - } - return (regionSplitLimit > server.getNumberOfOnlineRegions()); - } - - /** - * @return the regionSplitLimit - */ - public int getRegionSplitLimit() { - return this.regionSplitLimit; - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", - justification="Contrived use of compareTo") - private class CompactionRunner implements Runnable, Comparable { - private final Store store; - private final HRegion region; - private CompactionContext compaction; - private int queuedPriority; - private ThreadPoolExecutor parent; - private User user; - private long time; - - public CompactionRunner(Store store, Region region, - CompactionContext compaction, ThreadPoolExecutor parent, User user) { - super(); - this.store = store; - this.region = (HRegion)region; - this.compaction = compaction; - this.queuedPriority = (this.compaction == null) - ? store.getCompactPriority() : compaction.getRequest().getPriority(); - this.parent = parent; - this.user = user; - this.time = System.currentTimeMillis(); - } - - @Override - public String toString() { - return (this.compaction != null) ? ("Request = " + compaction.getRequest()) - : ("regionName = " + region.toString() + ", storeName = " + store.toString() + - ", priority = " + queuedPriority + ", time = " + time); - } - - private void doCompaction(User user) { - // Common case - system compaction without a file selection. Select now. - if (this.compaction == null) { - int oldPriority = this.queuedPriority; - this.queuedPriority = this.store.getCompactPriority(); - if (this.queuedPriority > oldPriority) { - // Store priority decreased while we were in queue (due to some other compaction?), - // requeue with new priority to avoid blocking potential higher priorities. - this.parent.execute(this); - return; - } - try { - this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); - } catch (IOException ex) { - LOG.error("Compaction selection failed " + this, ex); - server.checkFileSystem(); - return; - } - if (this.compaction == null) return; // nothing to do - // Now see if we are in correct pool for the size; if not, go to the correct one. - // We might end up waiting for a while, so cancel the selection. - assert this.compaction.hasSelection(); - ThreadPoolExecutor pool = store.throttleCompaction( - compaction.getRequest().getSize()) ? longCompactions : shortCompactions; - - // Long compaction pool can process small job - // Short compaction pool should not process large job - if (this.parent == shortCompactions && pool == longCompactions) { - this.store.cancelRequestedCompaction(this.compaction); - this.compaction = null; - this.parent = pool; - this.parent.execute(this); - return; - } - } - // Finally we can compact something. - assert this.compaction != null; - - this.compaction.getRequest().beforeExecute(); - try { - // Note: please don't put single-compaction logic here; - // put it into region/store/etc. This is CST logic. - long start = EnvironmentEdgeManager.currentTime(); - boolean completed = - region.compact(compaction, store, compactionThroughputController, user); - long now = EnvironmentEdgeManager.currentTime(); - LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + - this + "; duration=" + StringUtils.formatTimeDiff(now, start)); - if (completed) { - // degenerate case: blocked regions require recursive enqueues - if (store.getCompactPriority() <= 0) { - requestSystemCompaction(region, store, "Recursive enqueue"); - } else { - // see if the compaction has caused us to exceed max region size - requestSplit(region); - } - } - } catch (IOException ex) { - IOException remoteEx = - ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; - LOG.error("Compaction failed " + this, remoteEx); - if (remoteEx != ex) { - LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); - } - region.reportCompactionRequestFailure(); - server.checkFileSystem(); - } catch (Exception ex) { - LOG.error("Compaction failed " + this, ex); - region.reportCompactionRequestFailure(); - server.checkFileSystem(); - } finally { - LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this); - } - this.compaction.getRequest().afterExecute(); - } - - @Override - public void run() { - Preconditions.checkNotNull(server); - if (server.isStopped() - || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { - return; - } - doCompaction(user); - } - - private String formatStackTrace(Exception ex) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - return sw.toString(); - } - - @Override - public int compareTo(CompactionRunner o) { - // Only compare the underlying request (if any), for queue sorting purposes. - int compareVal = queuedPriority - o.queuedPriority; // compare priority - if (compareVal != 0) return compareVal; - CompactionContext tc = this.compaction, oc = o.compaction; - // Sort pre-selected (user?) compactions before system ones with equal priority. - return (tc == null) ? ((oc == null) ? 0 : 1) - : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); - } - } - - /** - * Cleanup class to use when rejecting a compaction request from the queue. - */ - private static class Rejection implements RejectedExecutionHandler { - @Override - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { - if (runnable instanceof CompactionRunner) { - CompactionRunner runner = (CompactionRunner)runnable; - LOG.debug("Compaction Rejected: " + runner); - runner.store.cancelRequestedCompaction(runner.compaction); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void onConfigurationChange(Configuration newConf) { - // Check if number of large / small compaction threads has changed, and then - // adjust the core pool size of the thread pools, by using the - // setCorePoolSize() method. According to the javadocs, it is safe to - // change the core pool size on-the-fly. We need to reset the maximum - // pool size, as well. - int largeThreads = Math.max(1, newConf.getInt( - LARGE_COMPACTION_THREADS, - LARGE_COMPACTION_THREADS_DEFAULT)); - if (this.longCompactions.getCorePoolSize() != largeThreads) { - LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + - " from " + this.longCompactions.getCorePoolSize() + " to " + - largeThreads); - if(this.longCompactions.getCorePoolSize() < largeThreads) { - this.longCompactions.setMaximumPoolSize(largeThreads); - this.longCompactions.setCorePoolSize(largeThreads); - } else { - this.longCompactions.setCorePoolSize(largeThreads); - this.longCompactions.setMaximumPoolSize(largeThreads); - } - } - - int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, - SMALL_COMPACTION_THREADS_DEFAULT); - if (this.shortCompactions.getCorePoolSize() != smallThreads) { - LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + - " from " + this.shortCompactions.getCorePoolSize() + " to " + - smallThreads); - if(this.shortCompactions.getCorePoolSize() < smallThreads) { - this.shortCompactions.setMaximumPoolSize(smallThreads); - this.shortCompactions.setCorePoolSize(smallThreads); - } else { - this.shortCompactions.setCorePoolSize(smallThreads); - this.shortCompactions.setMaximumPoolSize(smallThreads); - } - } - - int splitThreads = newConf.getInt(SPLIT_THREADS, - SPLIT_THREADS_DEFAULT); - if (this.splits.getCorePoolSize() != splitThreads) { - LOG.info("Changing the value of " + SPLIT_THREADS + - " from " + this.splits.getCorePoolSize() + " to " + - splitThreads); - if(this.splits.getCorePoolSize() < splitThreads) { - this.splits.setMaximumPoolSize(splitThreads); - this.splits.setCorePoolSize(splitThreads); - } else { - this.splits.setCorePoolSize(splitThreads); - this.splits.setMaximumPoolSize(splitThreads); - } - } - - ThroughputController old = this.compactionThroughputController; - if (old != null) { - old.stop("configuration change"); - } - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, newConf); - - // We change this atomically here instead of reloading the config in order that upstream - // would be the only one with the flexibility to reload the config. - this.conf.reloadConfiguration(); - } - - protected int getSmallCompactionThreadNum() { - return this.shortCompactions.getCorePoolSize(); - } - - protected int getLargeCompactionThreadNum() { - return this.longCompactions.getCorePoolSize(); - } - - protected int getSplitThreadNum() { - return this.splits.getCorePoolSize(); - } - - /** - * {@inheritDoc} - */ - @Override - public void registerChildren(ConfigurationManager manager) { - // No children to register. - } - - /** - * {@inheritDoc} - */ - @Override - public void deregisterChildren(ConfigurationManager manager) { - // No children to register - } - - @VisibleForTesting - public ThroughputController getCompactionThroughputController() { - return compactionThroughputController; - } - - @VisibleForTesting - /** - * Shutdown the long compaction thread pool. - * Should only be used in unit test to prevent long compaction thread pool from stealing job - * from short compaction queue - */ - void shutdownLongCompactions(){ - this.longCompactions.shutdown(); - } - - public void clearLongCompactionsQueue() { - longCompactions.getQueue().clear(); - } - - public void clearShortCompactionsQueue() { - shortCompactions.getQueue().clear(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java index 2773e00..6b8948b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java @@ -34,8 +34,8 @@ import com.google.common.annotations.VisibleForTesting; /** * A chore service that periodically cleans up the compacted files when there are no active readers - * using those compacted files and also helps in clearing the block cache with these compacted - * file entries + * using those compacted files and also helps in clearing the block cache of these compacted + * file entries. */ @InterfaceAudience.Private public class CompactedHFilesDischarger extends ScheduledChore { @@ -71,45 +71,56 @@ public class CompactedHFilesDischarger extends ScheduledChore { this.useExecutor = useExecutor; } + /** + * CompactedHFilesDischarger runs asynchronously by default using the hosting + * RegionServer's Executor. In tests it can be useful to force a synchronous + * cleanup. Use this method to set no-executor before you call run. + * @return The old setting for useExecutor + */ + @VisibleForTesting + boolean setUseExecutor(final boolean useExecutor) { + boolean oldSetting = this.useExecutor; + this.useExecutor = useExecutor; + return oldSetting; + } + @Override public void chore() { // Noop if rss is null. This will never happen in a normal condition except for cases // when the test case is not spinning up a cluster if (regionServerServices == null) return; List onlineRegions = regionServerServices.getOnlineRegions(); - if (onlineRegions != null) { - for (Region region : onlineRegions) { - if (LOG.isTraceEnabled()) { - LOG.trace( - "Started the compacted hfiles cleaner for the region " + region.getRegionInfo()); - } - for (Store store : region.getStores()) { - try { - if (useExecutor && regionServerServices != null) { - CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler( - (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER, - (HStore) store); - regionServerServices.getExecutorService().submit(handler); - } else { - // call synchronously if the RegionServerServices are not - // available - store.closeAndArchiveCompactedFiles(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Completed archiving the compacted files for the region " - + region.getRegionInfo() + " under the store " + store.getColumnFamilyName()); - } - } catch (Exception e) { - LOG.error("Exception while trying to close and archive the compacted store " - + "files of the store " + store.getColumnFamilyName() + " in the" + " region " - + region.getRegionInfo(), e); + if (onlineRegions == null) return; + for (Region region : onlineRegions) { + if (LOG.isTraceEnabled()) { + LOG.trace("Started compacted hfiles cleaner on " + region.getRegionInfo()); + } + for (Store store : region.getStores()) { + try { + if (useExecutor && regionServerServices != null) { + CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler( + (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER, + (HStore) store); + regionServerServices.getExecutorService().submit(handler); + } else { + // call synchronously if the RegionServerServices are not + // available + store.closeAndArchiveCompactedFiles(); } + if (LOG.isTraceEnabled()) { + LOG.trace("Completed archiving the compacted files for the region " + + region.getRegionInfo() + " under the store " + store.getColumnFamilyName()); + } + } catch (Exception e) { + LOG.error("Exception while trying to close and archive the compacted store " + + "files of the store " + store.getColumnFamilyName() + " in the" + " region " + + region.getRegionInfo(), e); } - if (LOG.isTraceEnabled()) { - LOG.trace( - "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo()); - } + } + if (LOG.isTraceEnabled()) { + LOG.trace( + "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo()); } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f58729d..a620a25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1390,14 +1390,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return !isClosed() && !isClosing(); } - /** @return true if region is splittable */ + @Override public boolean isSplittable() { - return isAvailable() && !hasReferences(); + boolean result = isAvailable() && !hasReferences(); + LOG.info("ASKED IF SPLITTABLE " + result, new Throwable("LOGGING")); + return result; } - /** - * @return true if region is mergeable - */ + @Override public boolean isMergeable() { if (!isAvailable()) { LOG.debug("Region " + this @@ -5117,11 +5117,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", - justification = "Notify is about post replay. Intentional") @Override public boolean refreshStoreFiles() throws IOException { - if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + return refreshStoreFiles(false); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", + justification = "Notify is about post replay. Intentional") + protected boolean refreshStoreFiles(boolean force) throws IOException { + if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return false; // if primary nothing to do } @@ -5879,7 +5883,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); + KeyValueScanner scanner; + try { + scanner = store.getScanner(scan, entry.getValue(), this.readPt); + } catch (FileNotFoundException e) { + throw handleFileNotFound(e); + } instantiatedScanners.add(scanner); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { @@ -5903,19 +5912,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private void handleFileNotFound(Throwable fnfe) { + private FileNotFoundException handleFileNotFound(FileNotFoundException fnfe) { // Try reopening the region since we have lost some storefiles. // See HBASE-17712 for more details. - LOG.warn("A store file got lost, so close and reopen region", fnfe); + LOG.warn("Store file is lost; close and reopen region", fnfe); if (regionUnassigner != null) { regionUnassigner.unassign(); } + return fnfe; } private IOException handleException(List instantiatedScanners, Throwable t) { if (t instanceof FileNotFoundException) { - handleFileNotFound(t); + handleFileNotFound((FileNotFoundException)t); } // remove scaner read point before throw the exception scannerReadPoints.remove(this); @@ -6061,29 +6071,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; - do { - // We want to maintain any progress that is made towards the limits while scanning across - // different column families. To do this, we toggle the keep progress flag on during calls - // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); - - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); - if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); - if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { - return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); - } else if (scannerContext.checkSizeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } else if (scannerContext.checkTimeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } - } while (moreCellsInRow); + try { + do { + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); + + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); + if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); + if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { + return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); + } else if (scannerContext.checkSizeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } + } while (moreCellsInRow); + } catch (FileNotFoundException e) { + throw handleFileNotFound(e); + } return nextKv != null; } @@ -6432,8 +6446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = this.joinedHeap.requestSeek(kv, true, true) || result; } } catch (FileNotFoundException e) { - handleFileNotFound(e); - throw e; + throw handleFileNotFound(e); } finally { closeRegionOperation(); } @@ -7818,6 +7831,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return null; } + // Can't split a region that is closing. + if (this.isClosing()) { + return null; + } + if (!splitPolicy.shouldSplit()) { return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 014427d..59a0fe5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -318,13 +318,15 @@ public class HRegionFileSystem { * @throws IOException */ public boolean hasReferences(final String familyName) throws IOException { - FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName)); + Path storeDir = getStoreDir(familyName); + FileStatus[] files = FSUtils.listStatus(fs, storeDir); if (files != null) { for(FileStatus stat: files) { if(stat.isDirectory()) { continue; } if(StoreFileInfo.isReference(stat.getPath())) { + if (LOG.isTraceEnabled()) LOG.trace("Reference " + stat.getPath()); return true; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3ca061a..9315b0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; -import org.apache.hadoop.hbase.client.NonceGenerator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.locking.EntityLock; @@ -170,8 +169,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -179,7 +176,6 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JSONBean; import org.apache.hadoop.hbase.util.JvmPauseMonitor; @@ -208,21 +204,23 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; -import sun.misc.Signal; -import sun.misc.SignalHandler; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import sun.misc.Signal; +import sun.misc.SignalHandler; + /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) -@SuppressWarnings({ "deprecation", "restriction" }) +@SuppressWarnings({ "deprecation"}) public class HRegionServer extends HasThread implements RegionServerServices, LastSequenceId, ConfigurationObserver { + // Time to pause if master says 'please hold'. Make configurable if needed. + private static final int INIT_PAUSE_TIME_MS = 1000; public static final String REGION_LOCK_AWAIT_TIME_SEC = "hbase.regionserver.region.lock.await.time.sec"; @@ -283,7 +281,7 @@ public class HRegionServer extends HasThread implements protected ReplicationSinkService replicationSinkHandler; // Compactions - public CompactSplitThread compactSplitThread; + public CompactSplit compactSplitThread; /** * Map of regions currently being served by this region server. Key is the @@ -514,7 +512,8 @@ public class HRegionServer extends HasThread implements */ protected final ConfigurationManager configurationManager; - private CompactedHFilesDischarger compactedFileDischarger; + @VisibleForTesting + CompactedHFilesDischarger compactedFileDischarger; private volatile ThroughputController flushThroughputController; @@ -914,7 +913,7 @@ public class HRegionServer extends HasThread implements this.cacheFlusher = new MemStoreFlusher(conf, this); // Compaction thread - this.compactSplitThread = new CompactSplitThread(this); + this.compactSplitThread = new CompactSplit(this); // Background thread to check for compactions; needed if region has not gotten updates // in a while. It will take care of not checking too frequently on store-by-store basis. @@ -1432,7 +1431,7 @@ public class HRegionServer extends HasThread implements // Only print out regions still closing if a small number else will // swamp the log. if (count < 10 && LOG.isDebugEnabled()) { - LOG.debug(this.onlineRegions); + LOG.debug("Online Regions=" + this.onlineRegions); } } } @@ -1779,7 +1778,7 @@ public class HRegionServer extends HasThread implements final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds final static int MIN_DELAY_TIME = 0; // millisec public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { - super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); + super("MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; } @@ -2192,6 +2191,8 @@ public class HRegionServer extends HasThread implements transition.addRegionInfo(HRegionInfo.convert(hri)); } ReportRegionStateTransitionRequest request = builder.build(); + int tries = 0; + long pauseTime = INIT_PAUSE_TIME_MS; while (keepLooping()) { RegionServerStatusService.BlockingInterface rss = rssStub; try { @@ -2202,95 +2203,40 @@ public class HRegionServer extends HasThread implements ReportRegionStateTransitionResponse response = rss.reportRegionStateTransition(null, request); if (response.hasErrorMessage()) { - LOG.info("Failed to transition " + hris[0] + LOG.info("Failed transition " + hris[0] + " to " + code + ": " + response.getErrorMessage()); return false; } + if (LOG.isTraceEnabled()) { + LOG.trace("TRANSITION REPORTED " + request); + } return true; } catch (ServiceException se) { IOException ioe = ProtobufUtil.getRemoteException(se); - LOG.info("Failed to report region transition, will retry", ioe); - if (rssStub == rss) { - rssStub = null; - } - } - } - return false; - } - - @Override - public long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow) { - NonceGenerator ng = clusterConnection.getNonceGenerator(); - final long nonceGroup = ng.getNonceGroup(); - final long nonce = ng.newNonce(); - long procId = -1; - SplitTableRegionRequest request = - RequestConverter.buildSplitTableRegionRequest(regionInfo, splitRow, nonceGroup, nonce); - - while (keepLooping()) { - RegionServerStatusService.BlockingInterface rss = rssStub; - try { - if (rss == null) { - createRegionServerStatusStub(); - continue; - } - SplitTableRegionResponse response = rss.splitRegion(null, request); - - //TODO: should we limit the retry number before quitting? - if (response == null || (procId = response.getProcId()) == -1) { - LOG.warn("Failed to split " + regionInfo + " retrying..."); - continue; + boolean pause = ioe instanceof ServerNotRunningYetException || + ioe instanceof PleaseHoldException; + if (pause) { + // Do backoff else we flood the Master with requests. + pauseTime = ConnectionUtils.getPauseTime(pauseTime, tries); + } else { + pauseTime = INIT_PAUSE_TIME_MS; // Reset. } - - break; - } catch (ServiceException se) { - // TODO: retry or just fail - IOException ioe = ProtobufUtil.getRemoteException(se); - LOG.info("Failed to split region, will retry", ioe); + LOG.info("Failed report of region transition; retry (#" + tries + ")" + + (pause? + " after " + pauseTime + "ms delay (Master is coming online...).": + " immediately."), + ioe); + if (pause) Threads.sleep(pauseTime); + tries++; if (rssStub == rss) { rssStub = null; } } } - return procId; - } - - @Override - public boolean isProcedureFinished(final long procId) throws IOException { - GetProcedureResultRequest request = - GetProcedureResultRequest.newBuilder().setProcId(procId).build(); - - while (keepLooping()) { - RegionServerStatusService.BlockingInterface rss = rssStub; - try { - if (rss == null) { - createRegionServerStatusStub(); - continue; - } - // TODO: find a way to get proc result - GetProcedureResultResponse response = rss.getProcedureResult(null, request); - - if (response == null) { - LOG.warn("Failed to get procedure (id=" + procId + ") status."); - return false; - } else if (response.getState() == GetProcedureResultResponse.State.RUNNING) { - return false; - } else if (response.hasException()) { - // Procedure failed. - throw ForeignExceptionUtil.toIOException(response.getException()); - } - // Procedure completes successfully - break; - } catch (ServiceException se) { - // TODO: retry or just fail - IOException ioe = ProtobufUtil.getRemoteException(se); - LOG.warn("Failed to get split region procedure result. Retrying", ioe); - if (rssStub == rss) { - rssStub = null; - } - } + if (LOG.isTraceEnabled()) { + LOG.trace("TRANSITION NOT REPORTED " + request); } - return true; + return false; } /** @@ -2981,7 +2927,7 @@ public class HRegionServer extends HasThread implements * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine */ public static void main(String[] args) throws Exception { - LOG.info("***** STARTING service '" + HRegionServer.class.getSimpleName() + "' *****"); + LOG.info("STARTING service '" + HRegionServer.class.getSimpleName()); VersionInfo.logVersion(); Configuration conf = HBaseConfiguration.create(); @SuppressWarnings("unchecked") @@ -3286,7 +3232,7 @@ public class HRegionServer extends HasThread implements throw new RegionOpeningException("Region " + regionNameStr + " is opening on " + this.serverName); } - throw new NotServingRegionException("Region " + regionNameStr + + throw new NotServingRegionException("" + regionNameStr + " is not online on " + this.serverName); } return region; @@ -3404,7 +3350,7 @@ public class HRegionServer extends HasThread implements } // This map will contains all the regions that we closed for a move. - // We add the time it was moved as we don't want to keep too old information + // We add the time it was moved as we don't want to keep too old information protected Map movedRegions = new ConcurrentHashMap<>(3000); @@ -3516,9 +3462,9 @@ public class HRegionServer extends HasThread implements } /** - * @return the underlying {@link CompactSplitThread} for the servers + * @return the underlying {@link CompactSplit} for the servers */ - public CompactSplitThread getCompactSplitThread() { + public CompactSplit getCompactSplitThread() { return this.compactSplitThread; }