hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [06/21] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)
Date Thu, 23 Mar 2017 15:43:10 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
new file mode 100644
index 0000000..1e58b9c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -0,0 +1,736 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Compact region on request and then run split if appropriate
+ */
+@InterfaceAudience.Private
+public class CompactSplit implements CompactionRequestor, PropagatingConfigurationObserver {
+  private static final Log LOG = LogFactory.getLog(CompactSplit.class);
+
+  // Configuration key for the large compaction threads.
+  public final static String LARGE_COMPACTION_THREADS =
+      "hbase.regionserver.thread.compaction.large";
+  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
+
+  // Configuration key for the small compaction threads.
+  public final static String SMALL_COMPACTION_THREADS =
+      "hbase.regionserver.thread.compaction.small";
+  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
+
+  // Configuration key for split threads
+  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
+  public final static int SPLIT_THREADS_DEFAULT = 1;
+
+  // Configuration keys for merge threads
+  public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
+  public final static int MERGE_THREADS_DEFAULT = 1;
+
+  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
+      "hbase.regionserver.regionSplitLimit";
+  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
+
+  private final HRegionServer server;
+  private final Configuration conf;
+
+  private final ThreadPoolExecutor longCompactions;
+  private final ThreadPoolExecutor shortCompactions;
+  private final ThreadPoolExecutor splits;
+  private final ThreadPoolExecutor mergePool;
+
+  private volatile ThroughputController compactionThroughputController;
+
+  /**
+   * Splitting should not take place if the total number of regions exceed this.
+   * This is not a hard limit to the number of regions but it is a guideline to
+   * stop splitting after number of online regions is greater than this.
+   */
+  private int regionSplitLimit;
+
+  /** @param server */
+  CompactSplit(HRegionServer server) {
+    super();
+    this.server = server;
+    this.conf = server.getConfiguration();
+    this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
+        DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
+
+    int largeThreads = Math.max(1, conf.getInt(
+        LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
+    int smallThreads = conf.getInt(
+        SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
+
+    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
+
+    // if we have throttle threads, make sure the user also specified size
+    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
+
+    final String n = Thread.currentThread().getName();
+
+    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
+    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
+        60, TimeUnit.SECONDS, stealJobQueue,
+        new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            String name = n + "-longCompactions-" + System.currentTimeMillis();
+            return new Thread(r, name);
+          }
+      });
+    this.longCompactions.setRejectedExecutionHandler(new Rejection());
+    this.longCompactions.prestartAllCoreThreads();
+    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
+        60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
+        new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            String name = n + "-shortCompactions-" + System.currentTimeMillis();
+            return new Thread(r, name);
+          }
+      });
+    this.shortCompactions
+        .setRejectedExecutionHandler(new Rejection());
+    this.splits = (ThreadPoolExecutor)
+        Executors.newFixedThreadPool(splitThreads,
+            new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            String name = n + "-splits-" + System.currentTimeMillis();
+            return new Thread(r, name);
+          }
+      });
+    int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
+    this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
+        mergeThreads, new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            String name = n + "-merges-" + System.currentTimeMillis();
+            return new Thread(r, name);
+          }
+        });
+
+    // compaction throughput controller
+    this.compactionThroughputController =
+        CompactionThroughputControllerFactory.create(server, conf);
+  }
+
+  @Override
+  public String toString() {
+    return "compaction_queue=("
+        + longCompactions.getQueue().size() + ":"
+        + shortCompactions.getQueue().size() + ")"
+        + ", split_queue=" + splits.getQueue().size()
+        + ", merge_queue=" + mergePool.getQueue().size();
+  }
+
+  public String dumpQueue() {
+    StringBuffer queueLists = new StringBuffer();
+    queueLists.append("Compaction/Split Queue dump:\n");
+    queueLists.append("  LargeCompation Queue:\n");
+    BlockingQueue<Runnable> lq = longCompactions.getQueue();
+    Iterator<Runnable> it = lq.iterator();
+    while (it.hasNext()) {
+      queueLists.append("    " + it.next().toString());
+      queueLists.append("\n");
+    }
+
+    if (shortCompactions != null) {
+      queueLists.append("\n");
+      queueLists.append("  SmallCompation Queue:\n");
+      lq = shortCompactions.getQueue();
+      it = lq.iterator();
+      while (it.hasNext()) {
+        queueLists.append("    " + it.next().toString());
+        queueLists.append("\n");
+      }
+    }
+
+    queueLists.append("\n");
+    queueLists.append("  Split Queue:\n");
+    lq = splits.getQueue();
+    it = lq.iterator();
+    while (it.hasNext()) {
+      queueLists.append("    " + it.next().toString());
+      queueLists.append("\n");
+    }
+
+    queueLists.append("\n");
+    queueLists.append("  Region Merge Queue:\n");
+    lq = mergePool.getQueue();
+    it = lq.iterator();
+    while (it.hasNext()) {
+      queueLists.append("    " + it.next().toString());
+      queueLists.append("\n");
+    }
+
+    return queueLists.toString();
+  }
+
+  public synchronized void requestRegionsMerge(final Region a,
+      final Region b, final boolean forcible, long masterSystemTime, User user) {
+    try {
+      mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
+            + forcible + ".  " + this);
+      }
+    } catch (RejectedExecutionException ree) {
+      LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
+          + forcible, ree);
+    }
+  }
+
+  public synchronized boolean requestSplit(final Region r) {
+    // don't split regions that are blocking
+    if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
+      byte[] midKey = ((HRegion)r).checkSplit();
+      if (midKey != null) {
+        requestSplit(r, midKey);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public synchronized void requestSplit(final Region r, byte[] midKey) {
+    requestSplit(r, midKey, null);
+  }
+
+  /*
+   * The User parameter allows the split thread to assume the correct user identity
+   */
+  public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
+    if (midKey == null) {
+      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
+        " not splittable because midkey=null");
+      if (((HRegion)r).shouldForceSplit()) {
+        ((HRegion)r).clearSplit();
+      }
+      return;
+    }
+    try {
+      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Splitting " + r + ", " + this);
+      }
+    } catch (RejectedExecutionException ree) {
+      LOG.info("Could not execute split for " + r, ree);
+    }
+  }
+
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
+      throws IOException {
+    return requestCompaction(r, why, null);
+  }
+
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
+      List<Pair<CompactionRequest, Store>> requests) throws IOException {
+    return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
+  }
+
+  @Override
+  public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
+      final String why, CompactionRequest request) throws IOException {
+    return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
+  }
+
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
+      int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
+    return requestCompactionInternal(r, why, p, requests, true, user);
+  }
+
+  private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
+      int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
+          throws IOException {
+    // not a special compaction request, so make our own list
+    List<CompactionRequest> ret = null;
+    if (requests == null) {
+      ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
+      for (Store s : r.getStores()) {
+        CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
+        if (selectNow) ret.add(cr);
+      }
+    } else {
+      Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
+      ret = new ArrayList<CompactionRequest>(requests.size());
+      for (Pair<CompactionRequest, Store> pair : requests) {
+        ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
+      }
+    }
+    return ret;
+  }
+
+  public CompactionRequest requestCompaction(final Region r, final Store s,
+      final String why, int priority, CompactionRequest request, User user) throws IOException {
+    return requestCompactionInternal(r, s, why, priority, request, true, user);
+  }
+
+  public synchronized void requestSystemCompaction(
+      final Region r, final String why) throws IOException {
+    requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
+  }
+
+  public void requestSystemCompaction(
+      final Region r, final Store s, final String why) throws IOException {
+    requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
+  }
+
+  /**
+   * @param r region store belongs to
+   * @param s Store to request compaction on
+   * @param why Why compaction requested -- used in debug messages
+   * @param priority override the default priority (NO_PRIORITY == decide)
+   * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
+   *          compaction will be used.
+   */
+  private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
+      final String why, int priority, CompactionRequest request, boolean selectNow, User user)
+          throws IOException {
+    if (this.server.isStopped()
+        || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
+      return null;
+    }
+
+    CompactionContext compaction = null;
+    if (selectNow) {
+      compaction = selectCompaction(r, s, priority, request, user);
+      if (compaction == null) return null; // message logged inside
+    }
+
+    // We assume that most compactions are small. So, put system compactions into small
+    // pool; we will do selection there, and move to large pool if necessary.
+    ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
+      ? longCompactions : shortCompactions;
+    pool.execute(new CompactionRunner(s, r, compaction, pool, user));
+    if (LOG.isDebugEnabled()) {
+      String type = (pool == shortCompactions) ? "Small " : "Large ";
+      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
+          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
+    }
+    return selectNow ? compaction.getRequest() : null;
+  }
+
+  private CompactionContext selectCompaction(final Region r, final Store s,
+      int priority, CompactionRequest request, User user) throws IOException {
+    CompactionContext compaction = s.requestCompaction(priority, request, user);
+    if (compaction == null) {
+      if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
+        LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
+            " because compaction request was cancelled");
+      }
+      return null;
+    }
+    assert compaction.hasSelection();
+    if (priority != Store.NO_PRIORITY) {
+      compaction.getRequest().setPriority(priority);
+    }
+    return compaction;
+  }
+
+  /**
+   * Only interrupt once it's done with a run through the work loop.
+   */
+  void interruptIfNecessary() {
+    splits.shutdown();
+    mergePool.shutdown();
+    longCompactions.shutdown();
+    shortCompactions.shutdown();
+  }
+
+  private void waitFor(ThreadPoolExecutor t, String name) {
+    boolean done = false;
+    while (!done) {
+      try {
+        done = t.awaitTermination(60, TimeUnit.SECONDS);
+        LOG.info("Waiting for " + name + " to finish...");
+        if (!done) {
+          t.shutdownNow();
+        }
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted waiting for " + name + " to finish...");
+      }
+    }
+  }
+
+  void join() {
+    waitFor(splits, "Split Thread");
+    waitFor(mergePool, "Merge Thread");
+    waitFor(longCompactions, "Large Compaction Thread");
+    waitFor(shortCompactions, "Small Compaction Thread");
+  }
+
+  /**
+   * Returns the current size of the queue containing regions that are
+   * processed.
+   *
+   * @return The current size of the regions queue.
+   */
+  public int getCompactionQueueSize() {
+    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
+  }
+
+  public int getLargeCompactionQueueSize() {
+    return longCompactions.getQueue().size();
+  }
+
+
+  public int getSmallCompactionQueueSize() {
+    return shortCompactions.getQueue().size();
+  }
+
+  public int getSplitQueueSize() {
+    return splits.getQueue().size();
+  }
+
+  private boolean shouldSplitRegion() {
+    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
+      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
+          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
+    }
+    return (regionSplitLimit > server.getNumberOfOnlineRegions());
+  }
+
+  /**
+   * @return the regionSplitLimit
+   */
+  public int getRegionSplitLimit() {
+    return this.regionSplitLimit;
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification="Contrived use of compareTo")
+  private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
+    private final Store store;
+    private final HRegion region;
+    private CompactionContext compaction;
+    private int queuedPriority;
+    private ThreadPoolExecutor parent;
+    private User user;
+
+    public CompactionRunner(Store store, Region region,
+        CompactionContext compaction, ThreadPoolExecutor parent, User user) {
+      super();
+      this.store = store;
+      this.region = (HRegion)region;
+      this.compaction = compaction;
+      this.queuedPriority = (this.compaction == null)
+          ? store.getCompactPriority() : compaction.getRequest().getPriority();
+      this.parent = parent;
+      this.user = user;
+    }
+
+    @Override
+    public String toString() {
+      return (this.compaction != null) ? ("Request = " + compaction.getRequest())
+          : ("Store = " + store.toString() + ", pri = " + queuedPriority);
+    }
+
+    private void doCompaction(User user) {
+      // Common case - system compaction without a file selection. Select now.
+      if (this.compaction == null) {
+        int oldPriority = this.queuedPriority;
+        this.queuedPriority = this.store.getCompactPriority();
+        if (this.queuedPriority > oldPriority) {
+          // Store priority decreased while we were in queue (due to some other compaction?),
+          // requeue with new priority to avoid blocking potential higher priorities.
+          this.parent.execute(this);
+          return;
+        }
+        try {
+          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
+        } catch (IOException ex) {
+          LOG.error("Compaction selection failed " + this, ex);
+          server.checkFileSystem();
+          return;
+        }
+        if (this.compaction == null) return; // nothing to do
+        // Now see if we are in correct pool for the size; if not, go to the correct one.
+        // We might end up waiting for a while, so cancel the selection.
+        assert this.compaction.hasSelection();
+        ThreadPoolExecutor pool = store.throttleCompaction(
+            compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
+
+        // Long compaction pool can process small job
+        // Short compaction pool should not process large job
+        if (this.parent == shortCompactions && pool == longCompactions) {
+          this.store.cancelRequestedCompaction(this.compaction);
+          this.compaction = null;
+          this.parent = pool;
+          this.parent.execute(this);
+          return;
+        }
+      }
+      // Finally we can compact something.
+      assert this.compaction != null;
+
+      this.compaction.getRequest().beforeExecute();
+      try {
+        // Note: please don't put single-compaction logic here;
+        //       put it into region/store/etc. This is CST logic.
+        long start = EnvironmentEdgeManager.currentTime();
+        boolean completed =
+            region.compact(compaction, store, compactionThroughputController, user);
+        long now = EnvironmentEdgeManager.currentTime();
+        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
+              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
+        if (completed) {
+          // degenerate case: blocked regions require recursive enqueues
+          if (store.getCompactPriority() <= 0) {
+            requestSystemCompaction(region, store, "Recursive enqueue");
+          } else {
+            // see if the compaction has caused us to exceed max region size
+            requestSplit(region);
+          }
+        }
+      } catch (IOException ex) {
+        IOException remoteEx =
+            ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
+        LOG.error("Compaction failed " + this, remoteEx);
+        if (remoteEx != ex) {
+          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
+        }
+        region.reportCompactionRequestFailure();
+        server.checkFileSystem();
+      } catch (Exception ex) {
+        LOG.error("Compaction failed " + this, ex);
+        region.reportCompactionRequestFailure();
+        server.checkFileSystem();
+      } finally {
+        LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
+      }
+      this.compaction.getRequest().afterExecute();
+    }
+
+    @Override
+    public void run() {
+      Preconditions.checkNotNull(server);
+      if (server.isStopped()
+          || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
+        return;
+      }
+      doCompaction(user);
+    }
+
+    private String formatStackTrace(Exception ex) {
+      StringWriter sw = new StringWriter();
+      PrintWriter pw = new PrintWriter(sw);
+      ex.printStackTrace(pw);
+      pw.flush();
+      return sw.toString();
+    }
+
+    @Override
+    public int compareTo(CompactionRunner o) {
+      // Only compare the underlying request (if any), for queue sorting purposes.
+      int compareVal = queuedPriority - o.queuedPriority; // compare priority
+      if (compareVal != 0) return compareVal;
+      CompactionContext tc = this.compaction, oc = o.compaction;
+      // Sort pre-selected (user?) compactions before system ones with equal priority.
+      return (tc == null) ? ((oc == null) ? 0 : 1)
+          : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
+    }
+  }
+
+  /**
+   * Cleanup class to use when rejecting a compaction request from the queue.
+   */
+  private static class Rejection implements RejectedExecutionHandler {
+    @Override
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+      if (runnable instanceof CompactionRunner) {
+        CompactionRunner runner = (CompactionRunner)runnable;
+        LOG.debug("Compaction Rejected: " + runner);
+        runner.store.cancelRequestedCompaction(runner.compaction);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void onConfigurationChange(Configuration newConf) {
+    // Check if number of large / small compaction threads has changed, and then
+    // adjust the core pool size of the thread pools, by using the
+    // setCorePoolSize() method. According to the javadocs, it is safe to
+    // change the core pool size on-the-fly. We need to reset the maximum
+    // pool size, as well.
+    int largeThreads = Math.max(1, newConf.getInt(
+            LARGE_COMPACTION_THREADS,
+            LARGE_COMPACTION_THREADS_DEFAULT));
+    if (this.longCompactions.getCorePoolSize() != largeThreads) {
+      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
+              " from " + this.longCompactions.getCorePoolSize() + " to " +
+              largeThreads);
+      if(this.longCompactions.getCorePoolSize() < largeThreads) {
+        this.longCompactions.setMaximumPoolSize(largeThreads);
+        this.longCompactions.setCorePoolSize(largeThreads);
+      } else {
+        this.longCompactions.setCorePoolSize(largeThreads);
+        this.longCompactions.setMaximumPoolSize(largeThreads);
+      }
+    }
+
+    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
+            SMALL_COMPACTION_THREADS_DEFAULT);
+    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
+      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
+                " from " + this.shortCompactions.getCorePoolSize() + " to " +
+                smallThreads);
+      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
+        this.shortCompactions.setMaximumPoolSize(smallThreads);
+        this.shortCompactions.setCorePoolSize(smallThreads);
+      } else {
+        this.shortCompactions.setCorePoolSize(smallThreads);
+        this.shortCompactions.setMaximumPoolSize(smallThreads);
+      }
+    }
+
+    int splitThreads = newConf.getInt(SPLIT_THREADS,
+            SPLIT_THREADS_DEFAULT);
+    if (this.splits.getCorePoolSize() != splitThreads) {
+      LOG.info("Changing the value of " + SPLIT_THREADS +
+                " from " + this.splits.getCorePoolSize() + " to " +
+                splitThreads);
+      if(this.splits.getCorePoolSize() < splitThreads) {
+        this.splits.setMaximumPoolSize(splitThreads);
+        this.splits.setCorePoolSize(splitThreads);
+      } else {
+        this.splits.setCorePoolSize(splitThreads);
+        this.splits.setMaximumPoolSize(splitThreads);
+      }
+    }
+
+    int mergeThreads = newConf.getInt(MERGE_THREADS,
+            MERGE_THREADS_DEFAULT);
+    if (this.mergePool.getCorePoolSize() != mergeThreads) {
+      LOG.info("Changing the value of " + MERGE_THREADS +
+                " from " + this.mergePool.getCorePoolSize() + " to " +
+                mergeThreads);
+      if(this.mergePool.getCorePoolSize() < mergeThreads) {
+        this.mergePool.setMaximumPoolSize(mergeThreads);
+        this.mergePool.setCorePoolSize(mergeThreads);
+      } else {
+        this.mergePool.setCorePoolSize(mergeThreads);
+        this.mergePool.setMaximumPoolSize(mergeThreads);
+      }
+    }
+
+    ThroughputController old = this.compactionThroughputController;
+    if (old != null) {
+      old.stop("configuration change");
+    }
+    this.compactionThroughputController =
+        CompactionThroughputControllerFactory.create(server, newConf);
+
+    // We change this atomically here instead of reloading the config in order that upstream
+    // would be the only one with the flexibility to reload the config.
+    this.conf.reloadConfiguration();
+  }
+
+  protected int getSmallCompactionThreadNum() {
+    return this.shortCompactions.getCorePoolSize();
+  }
+
+  protected int getLargeCompactionThreadNum() {
+    return this.longCompactions.getCorePoolSize();
+  }
+
+  protected int getSplitThreadNum() {
+    return this.splits.getCorePoolSize();
+  }
+
+  protected int getMergeThreadNum() {
+    return this.mergePool.getCorePoolSize();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void registerChildren(ConfigurationManager manager) {
+    // No children to register.
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deregisterChildren(ConfigurationManager manager) {
+    // No children to register
+  }
+
+  @VisibleForTesting
+  public ThroughputController getCompactionThroughputController() {
+    return compactionThroughputController;
+  }
+
+  @VisibleForTesting
+  public long getCompletedMergeTaskCount() {
+    return mergePool.getCompletedTaskCount();
+  }
+
+  @VisibleForTesting
+  /**
+   * Shutdown the long compaction thread pool.
+   * Should only be used in unit test to prevent long compaction thread pool from stealing job
+   * from short compaction queue
+   */
+  void shutdownLongCompactions(){
+    this.longCompactions.shutdown();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
deleted file mode 100644
index eba984a..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ /dev/null
@@ -1,722 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.conf.ConfigurationManager;
-import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
-import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.StealJobQueue;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * Compact region on request and then run split if appropriate
- */
-@InterfaceAudience.Private
-public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
-  private static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
-
-  // Configuration key for the large compaction threads.
-  public final static String LARGE_COMPACTION_THREADS =
-      "hbase.regionserver.thread.compaction.large";
-  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
-  
-  // Configuration key for the small compaction threads.
-  public final static String SMALL_COMPACTION_THREADS =
-      "hbase.regionserver.thread.compaction.small";
-  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
-  
-  // Configuration key for split threads
-  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
-  public final static int SPLIT_THREADS_DEFAULT = 1;
-  
-  // Configuration keys for merge threads
-  public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
-  public final static int MERGE_THREADS_DEFAULT = 1;
-
-  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
-      "hbase.regionserver.regionSplitLimit";
-  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
-
-  private final HRegionServer server;
-  private final Configuration conf;
-
-  private final ThreadPoolExecutor longCompactions;
-  private final ThreadPoolExecutor shortCompactions;
-  private final ThreadPoolExecutor splits;
-  private final ThreadPoolExecutor mergePool;
-
-  private volatile ThroughputController compactionThroughputController;
-
-  /**
-   * Splitting should not take place if the total number of regions exceed this.
-   * This is not a hard limit to the number of regions but it is a guideline to
-   * stop splitting after number of online regions is greater than this.
-   */
-  private int regionSplitLimit;
-
-  /** @param server */
-  CompactSplitThread(HRegionServer server) {
-    super();
-    this.server = server;
-    this.conf = server.getConfiguration();
-    this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
-        DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
-
-    int largeThreads = Math.max(1, conf.getInt(
-        LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
-    int smallThreads = conf.getInt(
-        SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
-
-    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
-
-    // if we have throttle threads, make sure the user also specified size
-    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
-
-    final String n = Thread.currentThread().getName();
-
-    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
-    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
-        60, TimeUnit.SECONDS, stealJobQueue,
-        new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            String name = n + "-longCompactions-" + System.currentTimeMillis();
-            return new Thread(r, name);
-          }
-      });
-    this.longCompactions.setRejectedExecutionHandler(new Rejection());
-    this.longCompactions.prestartAllCoreThreads();
-    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
-        60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
-        new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            String name = n + "-shortCompactions-" + System.currentTimeMillis();
-            return new Thread(r, name);
-          }
-      });
-    this.shortCompactions
-        .setRejectedExecutionHandler(new Rejection());
-    this.splits = (ThreadPoolExecutor)
-        Executors.newFixedThreadPool(splitThreads,
-            new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            String name = n + "-splits-" + System.currentTimeMillis();
-            return new Thread(r, name);
-          }
-      });
-    int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
-    this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
-        mergeThreads, new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            String name = n + "-merges-" + System.currentTimeMillis();
-            return new Thread(r, name);
-          }
-        });
-
-    // compaction throughput controller
-    this.compactionThroughputController =
-        CompactionThroughputControllerFactory.create(server, conf);
-  }
-
-  @Override
-  public String toString() {
-    return "compaction_queue=("
-        + longCompactions.getQueue().size() + ":"
-        + shortCompactions.getQueue().size() + ")"
-        + ", split_queue=" + splits.getQueue().size()
-        + ", merge_queue=" + mergePool.getQueue().size();
-  }
-  
-  public String dumpQueue() {
-    StringBuffer queueLists = new StringBuffer();
-    queueLists.append("Compaction/Split Queue dump:\n");
-    queueLists.append("  LargeCompation Queue:\n");
-    BlockingQueue<Runnable> lq = longCompactions.getQueue();
-    Iterator<Runnable> it = lq.iterator();
-    while (it.hasNext()) {
-      queueLists.append("    " + it.next().toString());
-      queueLists.append("\n");
-    }
-
-    if (shortCompactions != null) {
-      queueLists.append("\n");
-      queueLists.append("  SmallCompation Queue:\n");
-      lq = shortCompactions.getQueue();
-      it = lq.iterator();
-      while (it.hasNext()) {
-        queueLists.append("    " + it.next().toString());
-        queueLists.append("\n");
-      }
-    }
-
-    queueLists.append("\n");
-    queueLists.append("  Split Queue:\n");
-    lq = splits.getQueue();
-    it = lq.iterator();
-    while (it.hasNext()) {
-      queueLists.append("    " + it.next().toString());
-      queueLists.append("\n");
-    }
-
-    queueLists.append("\n");
-    queueLists.append("  Region Merge Queue:\n");
-    lq = mergePool.getQueue();
-    it = lq.iterator();
-    while (it.hasNext()) {
-      queueLists.append("    " + it.next().toString());
-      queueLists.append("\n");
-    }
-
-    return queueLists.toString();
-  }
-
-  public synchronized boolean requestSplit(final Region r) {
-    // don't split regions that are blocking
-    if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
-      byte[] midKey = ((HRegion)r).checkSplit();
-      if (midKey != null) {
-        requestSplit(r, midKey);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public synchronized void requestSplit(final Region r, byte[] midKey) {
-    requestSplit(r, midKey, null);
-  }
-
-  /*
-   * The User parameter allows the split thread to assume the correct user identity
-   */
-  public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
-    if (midKey == null) {
-      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
-        " not splittable because midkey=null");
-      if (((HRegion)r).shouldForceSplit()) {
-        ((HRegion)r).clearSplit();
-      }
-      return;
-    }
-    try {
-      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Split requested for " + r + ".  " + this);
-      }
-    } catch (RejectedExecutionException ree) {
-      LOG.info("Could not execute split for " + r, ree);
-    }
-  }
-
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
-      throws IOException {
-    return requestCompaction(r, why, null);
-  }
-
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
-      List<Pair<CompactionRequest, Store>> requests) throws IOException {
-    return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
-  }
-
-  @Override
-  public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
-      final String why, CompactionRequest request) throws IOException {
-    return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
-  }
-
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
-      int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
-    return requestCompactionInternal(r, why, p, requests, true, user);
-  }
-
-  private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
-      int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
-          throws IOException {
-    // not a special compaction request, so make our own list
-    List<CompactionRequest> ret = null;
-    if (requests == null) {
-      ret = selectNow ? new ArrayList<>(r.getStores().size()) : null;
-      for (Store s : r.getStores()) {
-        CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
-        if (selectNow) ret.add(cr);
-      }
-    } else {
-      Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
-      ret = new ArrayList<>(requests.size());
-      for (Pair<CompactionRequest, Store> pair : requests) {
-        ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
-      }
-    }
-    return ret;
-  }
-
-  public CompactionRequest requestCompaction(final Region r, final Store s,
-      final String why, int priority, CompactionRequest request, User user) throws IOException {
-    return requestCompactionInternal(r, s, why, priority, request, true, user);
-  }
-
-  public synchronized void requestSystemCompaction(
-      final Region r, final String why) throws IOException {
-    requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
-  }
-
-  public void requestSystemCompaction(
-      final Region r, final Store s, final String why) throws IOException {
-    requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
-  }
-
-  /**
-   * @param r region store belongs to
-   * @param s Store to request compaction on
-   * @param why Why compaction requested -- used in debug messages
-   * @param priority override the default priority (NO_PRIORITY == decide)
-   * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
-   *          compaction will be used.
-   */
-  private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
-      final String why, int priority, CompactionRequest request, boolean selectNow, User user)
-          throws IOException {
-    if (this.server.isStopped()
-        || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
-      return null;
-    }
-
-    CompactionContext compaction = null;
-    if (selectNow) {
-      compaction = selectCompaction(r, s, priority, request, user);
-      if (compaction == null) return null; // message logged inside
-    }
-
-    // We assume that most compactions are small. So, put system compactions into small
-    // pool; we will do selection there, and move to large pool if necessary.
-    ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
-      ? longCompactions : shortCompactions;
-    pool.execute(new CompactionRunner(s, r, compaction, pool, user));
-    if (LOG.isDebugEnabled()) {
-      String type = (pool == shortCompactions) ? "Small " : "Large ";
-      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
-          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
-    }
-    return selectNow ? compaction.getRequest() : null;
-  }
-
-  private CompactionContext selectCompaction(final Region r, final Store s,
-      int priority, CompactionRequest request, User user) throws IOException {
-    CompactionContext compaction = s.requestCompaction(priority, request, user);
-    if (compaction == null) {
-      if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
-        LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
-            " because compaction request was cancelled");
-      }
-      return null;
-    }
-    assert compaction.hasSelection();
-    if (priority != Store.NO_PRIORITY) {
-      compaction.getRequest().setPriority(priority);
-    }
-    return compaction;
-  }
-
-  /**
-   * Only interrupt once it's done with a run through the work loop.
-   */
-  void interruptIfNecessary() {
-    splits.shutdown();
-    mergePool.shutdown();
-    longCompactions.shutdown();
-    shortCompactions.shutdown();
-  }
-
-  private void waitFor(ThreadPoolExecutor t, String name) {
-    boolean done = false;
-    while (!done) {
-      try {
-        done = t.awaitTermination(60, TimeUnit.SECONDS);
-        LOG.info("Waiting for " + name + " to finish...");
-        if (!done) {
-          t.shutdownNow();
-        }
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted waiting for " + name + " to finish...");
-      }
-    }
-  }
-
-  void join() {
-    waitFor(splits, "Split Thread");
-    waitFor(mergePool, "Merge Thread");
-    waitFor(longCompactions, "Large Compaction Thread");
-    waitFor(shortCompactions, "Small Compaction Thread");
-  }
-
-  /**
-   * Returns the current size of the queue containing regions that are
-   * processed.
-   *
-   * @return The current size of the regions queue.
-   */
-  public int getCompactionQueueSize() {
-    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
-  }
-
-  public int getLargeCompactionQueueSize() {
-    return longCompactions.getQueue().size();
-  }
-
-
-  public int getSmallCompactionQueueSize() {
-    return shortCompactions.getQueue().size();
-  }
-
-  public int getSplitQueueSize() {
-    return splits.getQueue().size();
-  }
-
-  private boolean shouldSplitRegion() {
-    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
-      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
-          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
-    }
-    return (regionSplitLimit > server.getNumberOfOnlineRegions());
-  }
-
-  /**
-   * @return the regionSplitLimit
-   */
-  public int getRegionSplitLimit() {
-    return this.regionSplitLimit;
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
-      justification="Contrived use of compareTo")
-  private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
-    private final Store store;
-    private final HRegion region;
-    private CompactionContext compaction;
-    private int queuedPriority;
-    private ThreadPoolExecutor parent;
-    private User user;
-
-    public CompactionRunner(Store store, Region region,
-        CompactionContext compaction, ThreadPoolExecutor parent, User user) {
-      super();
-      this.store = store;
-      this.region = (HRegion)region;
-      this.compaction = compaction;
-      this.queuedPriority = (this.compaction == null)
-          ? store.getCompactPriority() : compaction.getRequest().getPriority();
-      this.parent = parent;
-      this.user = user;
-    }
-
-    @Override
-    public String toString() {
-      return (this.compaction != null) ? ("Request = " + compaction.getRequest())
-          : ("Store = " + store.toString() + ", pri = " + queuedPriority);
-    }
-
-    private void doCompaction(User user) {
-      // Common case - system compaction without a file selection. Select now.
-      if (this.compaction == null) {
-        int oldPriority = this.queuedPriority;
-        this.queuedPriority = this.store.getCompactPriority();
-        if (this.queuedPriority > oldPriority) {
-          // Store priority decreased while we were in queue (due to some other compaction?),
-          // requeue with new priority to avoid blocking potential higher priorities.
-          this.parent.execute(this);
-          return;
-        }
-        try {
-          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
-        } catch (IOException ex) {
-          LOG.error("Compaction selection failed " + this, ex);
-          server.checkFileSystem();
-          return;
-        }
-        if (this.compaction == null) return; // nothing to do
-        // Now see if we are in correct pool for the size; if not, go to the correct one.
-        // We might end up waiting for a while, so cancel the selection.
-        assert this.compaction.hasSelection();
-        ThreadPoolExecutor pool = store.throttleCompaction(
-            compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
-
-        // Long compaction pool can process small job
-        // Short compaction pool should not process large job
-        if (this.parent == shortCompactions && pool == longCompactions) {
-          this.store.cancelRequestedCompaction(this.compaction);
-          this.compaction = null;
-          this.parent = pool;
-          this.parent.execute(this);
-          return;
-        }
-      }
-      // Finally we can compact something.
-      assert this.compaction != null;
-
-      this.compaction.getRequest().beforeExecute();
-      try {
-        // Note: please don't put single-compaction logic here;
-        //       put it into region/store/etc. This is CST logic.
-        long start = EnvironmentEdgeManager.currentTime();
-        boolean completed =
-            region.compact(compaction, store, compactionThroughputController, user);
-        long now = EnvironmentEdgeManager.currentTime();
-        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
-              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
-        if (completed) {
-          // degenerate case: blocked regions require recursive enqueues
-          if (store.getCompactPriority() <= 0) {
-            requestSystemCompaction(region, store, "Recursive enqueue");
-          } else {
-            // see if the compaction has caused us to exceed max region size
-            requestSplit(region);
-          }
-        }
-      } catch (IOException ex) {
-        IOException remoteEx =
-            ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
-        LOG.error("Compaction failed " + this, remoteEx);
-        if (remoteEx != ex) {
-          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
-        }
-        region.reportCompactionRequestFailure();
-        server.checkFileSystem();
-      } catch (Exception ex) {
-        LOG.error("Compaction failed " + this, ex);
-        region.reportCompactionRequestFailure();
-        server.checkFileSystem();
-      } finally {
-        LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
-      }
-      this.compaction.getRequest().afterExecute();
-    }
-
-    @Override
-    public void run() {
-      Preconditions.checkNotNull(server);
-      if (server.isStopped()
-          || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
-        return;
-      }
-      doCompaction(user);
-    }
-
-    private String formatStackTrace(Exception ex) {
-      StringWriter sw = new StringWriter();
-      PrintWriter pw = new PrintWriter(sw);
-      ex.printStackTrace(pw);
-      pw.flush();
-      return sw.toString();
-    }
-
-    @Override
-    public int compareTo(CompactionRunner o) {
-      // Only compare the underlying request (if any), for queue sorting purposes.
-      int compareVal = queuedPriority - o.queuedPriority; // compare priority
-      if (compareVal != 0) return compareVal;
-      CompactionContext tc = this.compaction, oc = o.compaction;
-      // Sort pre-selected (user?) compactions before system ones with equal priority.
-      return (tc == null) ? ((oc == null) ? 0 : 1)
-          : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
-    }
-  }
-
-  /**
-   * Cleanup class to use when rejecting a compaction request from the queue.
-   */
-  private static class Rejection implements RejectedExecutionHandler {
-    @Override
-    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
-      if (runnable instanceof CompactionRunner) {
-        CompactionRunner runner = (CompactionRunner)runnable;
-        LOG.debug("Compaction Rejected: " + runner);
-        runner.store.cancelRequestedCompaction(runner.compaction);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void onConfigurationChange(Configuration newConf) {
-    // Check if number of large / small compaction threads has changed, and then
-    // adjust the core pool size of the thread pools, by using the
-    // setCorePoolSize() method. According to the javadocs, it is safe to
-    // change the core pool size on-the-fly. We need to reset the maximum
-    // pool size, as well.
-    int largeThreads = Math.max(1, newConf.getInt(
-            LARGE_COMPACTION_THREADS,
-            LARGE_COMPACTION_THREADS_DEFAULT));
-    if (this.longCompactions.getCorePoolSize() != largeThreads) {
-      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
-              " from " + this.longCompactions.getCorePoolSize() + " to " +
-              largeThreads);
-      if(this.longCompactions.getCorePoolSize() < largeThreads) {
-        this.longCompactions.setMaximumPoolSize(largeThreads);
-        this.longCompactions.setCorePoolSize(largeThreads);
-      } else {
-        this.longCompactions.setCorePoolSize(largeThreads);
-        this.longCompactions.setMaximumPoolSize(largeThreads);
-      }
-    }
-
-    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
-            SMALL_COMPACTION_THREADS_DEFAULT);
-    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
-      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
-                " from " + this.shortCompactions.getCorePoolSize() + " to " +
-                smallThreads);
-      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
-        this.shortCompactions.setMaximumPoolSize(smallThreads);
-        this.shortCompactions.setCorePoolSize(smallThreads);
-      } else {
-        this.shortCompactions.setCorePoolSize(smallThreads);
-        this.shortCompactions.setMaximumPoolSize(smallThreads);
-      }
-    }
-
-    int splitThreads = newConf.getInt(SPLIT_THREADS,
-            SPLIT_THREADS_DEFAULT);
-    if (this.splits.getCorePoolSize() != splitThreads) {
-      LOG.info("Changing the value of " + SPLIT_THREADS +
-                " from " + this.splits.getCorePoolSize() + " to " +
-                splitThreads);
-      if(this.splits.getCorePoolSize() < splitThreads) {
-        this.splits.setMaximumPoolSize(splitThreads);
-        this.splits.setCorePoolSize(splitThreads);
-      } else {
-        this.splits.setCorePoolSize(splitThreads);
-        this.splits.setMaximumPoolSize(splitThreads);
-      }
-    }
-
-    int mergeThreads = newConf.getInt(MERGE_THREADS,
-            MERGE_THREADS_DEFAULT);
-    if (this.mergePool.getCorePoolSize() != mergeThreads) {
-      LOG.info("Changing the value of " + MERGE_THREADS +
-                " from " + this.mergePool.getCorePoolSize() + " to " +
-                mergeThreads);
-      if(this.mergePool.getCorePoolSize() < mergeThreads) {
-        this.mergePool.setMaximumPoolSize(mergeThreads);
-        this.mergePool.setCorePoolSize(mergeThreads);
-      } else {
-        this.mergePool.setCorePoolSize(mergeThreads);
-        this.mergePool.setMaximumPoolSize(mergeThreads);
-      }
-    }
-
-    ThroughputController old = this.compactionThroughputController;
-    if (old != null) {
-      old.stop("configuration change");
-    }
-    this.compactionThroughputController =
-        CompactionThroughputControllerFactory.create(server, newConf);
-
-    // We change this atomically here instead of reloading the config in order that upstream
-    // would be the only one with the flexibility to reload the config.
-    this.conf.reloadConfiguration();
-  }
-
-  protected int getSmallCompactionThreadNum() {
-    return this.shortCompactions.getCorePoolSize();
-  }
-
-  protected int getLargeCompactionThreadNum() {
-    return this.longCompactions.getCorePoolSize();
-  }
-
-  protected int getSplitThreadNum() {
-    return this.splits.getCorePoolSize();
-  }
-
-  protected int getMergeThreadNum() {
-    return this.mergePool.getCorePoolSize();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void registerChildren(ConfigurationManager manager) {
-    // No children to register.
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void deregisterChildren(ConfigurationManager manager) {
-    // No children to register
-  }
-
-  @VisibleForTesting
-  public ThroughputController getCompactionThroughputController() {
-    return compactionThroughputController;
-  }
-
-  @VisibleForTesting
-  public long getCompletedMergeTaskCount() {
-    return mergePool.getCompletedTaskCount();
-  }
-
-  @VisibleForTesting
-  /**
-   * Shutdown the long compaction thread pool.
-   * Should only be used in unit test to prevent long compaction thread pool from stealing job
-   * from short compaction queue
-   */
-  void shutdownLongCompactions(){
-    this.longCompactions.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b3b5113..2d31f3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.client.NonceGenerator;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
@@ -148,8 +147,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -161,8 +158,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -170,7 +165,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.JSONBean;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
@@ -199,13 +193,13 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -274,7 +268,7 @@ public class HRegionServer extends HasThread implements
   protected ReplicationSinkService replicationSinkHandler;
 
   // Compactions
-  public CompactSplitThread compactSplitThread;
+  public CompactSplit compactSplitThread;
 
   /**
    * Map of regions currently being served by this region server. Key is the
@@ -902,7 +896,7 @@ public class HRegionServer extends HasThread implements
     this.cacheFlusher = new MemStoreFlusher(conf, this);
 
     // Compaction thread
-    this.compactSplitThread = new CompactSplitThread(this);
+    this.compactSplitThread = new CompactSplit(this);
 
     // Background thread to check for compactions; needed if region has not gotten updates
     // in a while. It will take care of not checking too frequently on store-by-store basis.
@@ -1684,7 +1678,7 @@ public class HRegionServer extends HasThread implements
     final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
     final static int MIN_DELAY_TIME = 0; // millisec
     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
-      super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
+      super("MemstoreFlusherChore", server, cacheFlushInterval);
       this.server = server;
     }
 
@@ -2110,6 +2104,7 @@ public class HRegionServer extends HasThread implements
             + " to " + code + ": " + response.getErrorMessage());
           return false;
         }
+        LOG.info("TRANSITION REPORTED " + request);
         return true;
       } catch (ServiceException se) {
         IOException ioe = ProtobufUtil.getRemoteException(se);
@@ -2119,84 +2114,10 @@ public class HRegionServer extends HasThread implements
         }
       }
     }
+    LOG.info("TRANSITION NOT REPORTED " + request);
     return false;
   }
 
-  @Override
-  public long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow) {
-    NonceGenerator ng = clusterConnection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup();
-    final long nonce = ng.newNonce();
-    long procId = -1;
-    SplitTableRegionRequest request =
-        RequestConverter.buildSplitTableRegionRequest(regionInfo, splitRow, nonceGroup, nonce);
-
-    while (keepLooping()) {
-      RegionServerStatusService.BlockingInterface rss = rssStub;
-      try {
-        if (rss == null) {
-          createRegionServerStatusStub();
-          continue;
-        }
-        SplitTableRegionResponse response = rss.splitRegion(null, request);
-
-        //TODO: should we limit the retry number before quitting?
-        if (response == null || (procId = response.getProcId()) == -1) {
-          LOG.warn("Failed to split " + regionInfo + " retrying...");
-          continue;
-        }
-
-        break;
-      } catch (ServiceException se) {
-        // TODO: retry or just fail
-        IOException ioe = ProtobufUtil.getRemoteException(se);
-        LOG.info("Failed to split region, will retry", ioe);
-        if (rssStub == rss) {
-          rssStub = null;
-        }
-      }
-    }
-    return procId;
-  }
-
-  @Override
-  public boolean isProcedureFinished(final long procId) throws IOException {
-    GetProcedureResultRequest request =
-        GetProcedureResultRequest.newBuilder().setProcId(procId).build();
-
-    while (keepLooping()) {
-      RegionServerStatusService.BlockingInterface rss = rssStub;
-      try {
-        if (rss == null) {
-          createRegionServerStatusStub();
-          continue;
-        }
-        // TODO: find a way to get proc result
-        GetProcedureResultResponse response = rss.getProcedureResult(null, request);
-
-        if (response == null) {
-          LOG.warn("Failed to get procedure (id=" + procId + ") status.");
-          return false;
-        } else if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
-          return false;
-        } else if (response.hasException()) {
-          // Procedure failed.
-          throw ForeignExceptionUtil.toIOException(response.getException());
-        }
-        // Procedure completes successfully
-        break;
-      } catch (ServiceException se) {
-        // TODO: retry or just fail
-        IOException ioe = ProtobufUtil.getRemoteException(se);
-        LOG.warn("Failed to get split region procedure result.  Retrying", ioe);
-        if (rssStub == rss) {
-          rssStub = null;
-        }
-      }
-    }
-    return true;
-  }
-
   /**
    * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
    * block this thread. See RegionReplicaFlushHandler for details.
@@ -3419,9 +3340,9 @@ public class HRegionServer extends HasThread implements
   }
 
   /**
-   * @return the underlying {@link CompactSplitThread} for the servers
+   * @return the underlying {@link CompactSplit} for the servers
    */
-  public CompactSplitThread getCompactSplitThread() {
+  public CompactSplit getCompactSplitThread() {
     return this.compactSplitThread;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 298f538..705442a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
+import org.apache.hadoop.hbase.exceptions.MergeRegionException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
@@ -118,12 +119,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -136,6 +137,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
@@ -1399,36 +1402,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  @Override
-  @QosPriority(priority=HConstants.ADMIN_QOS)
-  public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge(
-      final RpcController controller,
-      final CloseRegionForSplitOrMergeRequest request) throws ServiceException {
-    try {
-      checkOpen();
-
-      List<String> encodedRegionNameList = new ArrayList<>();
-      for(int i = 0; i < request.getRegionCount(); i++) {
-        final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion(i));
-
-        // Can be null if we're calling close on a region that's not online
-        final Region targetRegion = regionServer.getFromOnlineRegions(encodedRegionName);
-        if ((targetRegion != null) && (targetRegion.getCoprocessorHost() != null)) {
-          targetRegion.getCoprocessorHost().preClose(false);
-          encodedRegionNameList.add(encodedRegionName);
-        }
-      }
-      requestCount.increment();
-      LOG.info("Close and offline " + encodedRegionNameList + " regions.");
-      boolean closed = regionServer.closeAndOfflineRegionForSplitOrMerge(encodedRegionNameList);
-      CloseRegionForSplitOrMergeResponse.Builder builder =
-          CloseRegionForSplitOrMergeResponse.newBuilder().setClosed(closed);
-      return builder.build();
-    } catch (IOException ie) {
-      throw new ServiceException(ie);
-    }
-  }
-
   /**
    * Compact a region on the region server.
    *
@@ -1742,8 +1715,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           // The region is already online. This should not happen any more.
           String error = "Received OPEN for the region:"
             + region.getRegionNameAsString() + ", which is already online";
-          regionServer.abort(error);
-          throw new IOException(error);
+          LOG.warn(error);
+          //regionServer.abort(error);
+          //throw new IOException(error);
+          builder.addOpeningState(RegionOpeningState.OPENED);
+          continue;
         }
         LOG.info("Open " + region.getRegionNameAsString());
 
@@ -3230,4 +3206,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return UpdateConfigurationResponse.getDefaultInstance();
   }
 
-}
+  @Override
+  public ExecuteProceduresResponse executeProcedures(RpcController controller,
+      ExecuteProceduresRequest request) throws ServiceException {
+    ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
+    if (request.getOpenRegionCount() > 0) {
+      for (OpenRegionRequest req: request.getOpenRegionList()) {
+        builder.addOpenRegion(openRegion(controller, req));
+      }
+    }
+    if (request.getCloseRegionCount() > 0) {
+      for (CloseRegionRequest req: request.getCloseRegionList()) {
+        builder.addCloseRegion(closeRegion(controller, req));
+      }
+    }
+    return builder.build();
+  }
+
+  /**
+   * Merge regions on the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @return merge regions response
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority = HConstants.ADMIN_QOS)
+  public MergeRegionsResponse mergeRegions(final RpcController controller,
+      final MergeRegionsRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      Region regionA = getRegion(request.getRegionA());
+      Region regionB = getRegion(request.getRegionB());
+      boolean forcible = request.getForcible();
+      long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
+      regionA.startRegionOperation(Operation.MERGE_REGION);
+      regionB.startRegionOperation(Operation.MERGE_REGION);
+      if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
+          regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+        throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
+      }
+      LOG.info("Receiving merging request for  " + regionA + ", " + regionB
+          + ",forcible=" + forcible);
+      regionA.flush(true);
+      regionB.flush(true);
+      regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
+          masterSystemTime, RpcServer.getRequestUser());
+      return MergeRegionsResponse.newBuilder().build();
+    } catch (DroppedSnapshotException ex) {
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
new file mode 100644
index 0000000..e0980d2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -0,0 +1,109 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles processing region merges. Put in a queue, owned by HRegionServer.
+ */
+@InterfaceAudience.Private
+class RegionMergeRequest implements Runnable {
+  private static final Log LOG = LogFactory.getLog(RegionMergeRequest.class);
+  private final HRegionInfo region_a;
+  private final HRegionInfo region_b;
+  private final HRegionServer server;
+  private final boolean forcible;
+  private final User user;
+
+  RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible,
+      long masterSystemTime, User user) {
+    Preconditions.checkNotNull(hrs);
+    this.region_a = a.getRegionInfo();
+    this.region_b = b.getRegionInfo();
+    this.server = hrs;
+    this.forcible = forcible;
+    this.user = user;
+  }
+
+  @Override
+  public String toString() {
+    return "MergeRequest,regions:" + region_a + ", " + region_b + ", forcible="
+        + forcible;
+  }
+
+  private void doMerge() {
+    boolean success = false;
+    //server.metricsRegionServer.incrMergeRequest();
+
+    if (user != null && user.getUGI() != null) {
+      user.getUGI().doAs (new PrivilegedAction<Void>() {
+        @Override
+        public Void run() {
+          requestRegionMerge();
+          return null;
+        }
+      });
+    } else {
+      requestRegionMerge();
+    }
+  }
+
+  private void requestRegionMerge() {
+    final TableName table = region_a.getTable();
+    if (!table.equals(region_b.getTable())) {
+      LOG.error("Can't merge regions from two different tables: " + region_a + ", " + region_b);
+      return;
+    }
+
+    // TODO: fake merged region for compat with the report protocol
+    final HRegionInfo merged = new HRegionInfo(table);
+
+    // Send the split request to the master. the master will do the validation on the split-key.
+    // The parent region will be unassigned and the two new regions will be assigned.
+    // hri_a and hri_b objects may not reflect the regions that will be created, those objectes
+    // are created just to pass the information to the reportRegionStateTransition().
+    if (!server.reportRegionStateTransition(TransitionCode.READY_TO_MERGE, merged, region_a, region_b)) {
+      LOG.error("Unable to ask master to merge: " + region_a + ", " + region_b);
+    }
+  }
+
+  @Override
+  public void run() {
+    if (this.server.isStopping() || this.server.isStopped()) {
+      LOG.debug("Skipping merge because server is stopping="
+          + this.server.isStopping() + " or stopped=" + this.server.isStopped());
+      return;
+    }
+
+    doMerge();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 3382263..623eab2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -177,16 +177,6 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
   boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris);
 
   /**
-   * Notify master that a region wants to be splitted.
-   */
-  long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow);
-
-  /**
-   * Check with master whether a procedure is completed (either succeed or fail)
-   */
-  boolean isProcedureFinished(final long procId) throws IOException;
-
-  /**
    * Returns a reference to the region server's RPC server
    */
   RpcServerInterface getRpcServer();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index eb9811d..5407cfb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -23,8 +23,11 @@ import java.security.PrivilegedAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.ipc.RemoteException;
@@ -37,14 +40,14 @@ import com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 class SplitRequest implements Runnable {
   private static final Log LOG = LogFactory.getLog(SplitRequest.class);
-  private final HRegion parent;
+  private final HRegionInfo parent;
   private final byte[] midKey;
   private final HRegionServer server;
   private final User user;
 
   SplitRequest(Region region, byte[] midKey, HRegionServer hrs, User user) {
     Preconditions.checkNotNull(hrs);
-    this.parent = (HRegion)region;
+    this.parent = region.getRegionInfo();
     this.midKey = midKey;
     this.server = hrs;
     this.user = user;
@@ -58,65 +61,29 @@ class SplitRequest implements Runnable {
   private void doSplitting() {
     boolean success = false;
     server.metricsRegionServer.incrSplitRequest();
-    long startTime = EnvironmentEdgeManager.currentTime();
-
-    try {
-      long procId;
-      if (user != null && user.getUGI() != null) {
-        procId = user.getUGI().doAs (new PrivilegedAction<Long>() {
-          @Override
-          public Long run() {
-            try {
-              return server.requestRegionSplit(parent.getRegionInfo(), midKey);
-            } catch (Exception e) {
-              LOG.error("Failed to complete region split ", e);
-            }
-            return (long)-1;
-          }
-        });
-      } else {
-        procId = server.requestRegionSplit(parent.getRegionInfo(), midKey);
-      }
-
-      if (procId != -1) {
-        // wait for the split to complete or get interrupted.  If the split completes successfully,
-        // the procedure will return true; if the split fails, the procedure would throw exception.
-        //
-        try {
-          while (!(success = server.isProcedureFinished(procId))) {
-            try {
-              Thread.sleep(1000);
-            } catch (InterruptedException e) {
-              LOG.warn("Split region " + parent + " is still in progress.  Not waiting...");
-              break;
-            }
-          }
-        } catch (IOException e) {
-          LOG.error("Split region " + parent + " failed.", e);
+    if (user != null && user.getUGI() != null) {
+      user.getUGI().doAs (new PrivilegedAction<Void>() {
+        @Override
+        public Void run() {
+          requestRegionSplit();
+          return null;
         }
-      } else {
-        LOG.error("Fail to split region " + parent);
-      }
-    } finally {
-      if (this.parent.getCoprocessorHost() != null) {
-        try {
-          this.parent.getCoprocessorHost().postCompleteSplit();
-        } catch (IOException io) {
-          LOG.error("Split failed " + this,
-            io instanceof RemoteException ? ((RemoteException) io).unwrapRemoteException() : io);
-        }
-      }
-
-      // Update regionserver metrics with the split transaction total running time
-      server.metricsRegionServer.updateSplitTime(EnvironmentEdgeManager.currentTime() - startTime);
-
-      if (parent.shouldForceSplit()) {
-        parent.clearSplit();
-      }
+      });
+    } else {
+      requestRegionSplit();
+    }
+  }
 
-      if (success) {
-        server.metricsRegionServer.incrSplitSuccess();
-      }
+  private void requestRegionSplit() {
+    final TableName table = parent.getTable();
+    final HRegionInfo hri_a = new HRegionInfo(table, parent.getStartKey(), midKey);
+    final HRegionInfo hri_b = new HRegionInfo(table, midKey, parent.getEndKey());
+    // Send the split request to the master. the master will do the validation on the split-key.
+    // The parent region will be unassigned and the two new regions will be assigned.
+    // hri_a and hri_b objects may not reflect the regions that will be created, those objectes
+    // are created just to pass the information to the reportRegionStateTransition().
+    if (!server.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent, hri_a, hri_b)) {
+      LOG.error("Unable to ask master to split " + parent.getRegionNameAsString());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 4eab62b..91cd258 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -559,7 +559,7 @@ public class HBaseFsck extends Configured implements Closeable {
     errors.print("Number of requests: " + status.getRequestsCount());
     errors.print("Number of regions: " + status.getRegionsCount());
 
-    Set<RegionState> rits = status.getRegionsInTransition();
+    List<RegionState> rits = status.getRegionsInTransition();
     errors.print("Number of regions in transition: " + rits.size());
     if (details) {
       for (RegionState state: rits) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
index d7749c2..8ea7012 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 
 /**
  * Utility methods for interacting with the regions.
@@ -223,7 +223,7 @@ public abstract class ModifyRegionUtils {
   static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
       final String threadNamePrefix, int regionNumber) {
     int maxThreads = Math.min(regionNumber, conf.getInt(
-        "hbase.hregion.open.and.init.threads.max", 10));
+        "hbase.hregion.open.and.init.threads.max", 16));
     ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
     .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
         new ThreadFactory() {
@@ -236,24 +236,4 @@ public abstract class ModifyRegionUtils {
         });
     return regionOpenAndInitThreadPool;
   }
-
-  /**
-   * Triggers a bulk assignment of the specified regions
-   *
-   * @param assignmentManager the Assignment Manger
-   * @param regionInfos the list of regions to assign
-   * @throws IOException if an error occurred during the assignment
-   */
-  public static void assignRegions(final AssignmentManager assignmentManager,
-      final List<HRegionInfo> regionInfos) throws IOException {
-    try {
-      assignmentManager.getRegionStates().createRegionStates(regionInfos);
-      assignmentManager.assign(regionInfos);
-    } catch (InterruptedException e) {
-      LOG.error("Caught " + e + " during round-robin assignment");
-      InterruptedIOException ie = new InterruptedIOException(e.getMessage());
-      ie.initCause(e);
-      throw ie;
-    }
-  }
 }


Mime
View raw message