hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [21/21] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)
Date Thu, 23 Mar 2017 15:43:25 GMT
HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)

    Includes four patches from Matteos' repository and then fix up to get it all to
    pass, fix findbugs, etc.. I apply the two patches in one go because applying
    each independently puts hbase in a non-working state.

     1. HBASE-14616 Procedure v2 - Replace the old AM with the new AM
     This comes from Matteo's repo here:
     https://github.com/matteobertozzi/hbase/commit/689227fcbfe8e6588433dbcdabf4526e3d478b2e

     Patch replaces old AM with the new under subpackage master.assignment.
     Mostly just updating classes to use new AM -- import changes -- rather
     than the old. It also removes old AM and supporting classes.
     See below for more detail.

     2. HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)
     Adds running of remote procedure. Adds batching of remote calls.
     Adds support for assign/unassign in procedures. Adds version info
     reporting in rpc. Adds start of an AMv2.

     3. and 4. are fixes around merge and split.

     This work mostly comes from:
     https://github.com/matteobertozzi/hbase/commit/3622cba4e331d2fc7bfc1932abb4c9cbf5802efa

     Reporting of remote RS version is from here:
     https://github.com/matteobertozzi/hbase/commit/ddb4df3964e8298c88c0210e83493aa91ac0942d.patch

     And remote dispatch of procedures is from:
     https://github.com/matteobertozzi/hbase/commit/186b9e7c4dae61a79509a6c3aad7f80ec61345e5

     The split merge patches from here are also melded in:
    https://github.com/matteobertozzi/hbase/commit/9a3a95a2c2974842a4849d1ad867e70764e7f707
    and https://github.com/matteobertozzi/hbase/commit/d6289307a02a777299f65238238a2a8af3253067

     Adds testing util for new AM and new sets of tests.

     Details:

     M hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
     Takes list of regionstates on construction rather than a Set on construction.
     NOTE!!!!! This is a change in a public class.

     M hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
     Purge old overlapping states: PENDING_OPEN, PENDING_CLOSE, etc.

     A hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
     Dispatch remote procedures every 150ms or 32 items -- which ever
     happens first (configurable). Runs a timeout thread.
     Carries notion of a remote procedure and of a buffer full of these.
     "hbase.procedure.remote.dispatcher.threadpool.size" with default = 128
     "hbase.procedure.remote.dispatcher.delay.msec" with default = 150ms
     "hbase.procedure.remote.dispatcher.max.queue.size" with default = 32

     M hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
     Add execute procedures call ExecuteProcedures.

     M hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
     Add assign and unassign state support for procedures.

     M hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
     Adds getting RS version out of RPC
     Examples: (1.3.4 is 0x0103004, 2.1.0 is 0x0201000)

     M hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
     Remove periodic metrics chore. This is done over in new AM now.
     Replace AM with the new.

     M hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
     Have AMv2 handle assigning meta.

     M hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
     Extract version number of the server making rpc.

     A hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
     Add new assign procedure. Runs assign via Procedure Dispatch.
     There can only be one RegionTransitionProcedure per region running at the time,
     since each procedure takes a lock on the region.

     D hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
     D hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
     D hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java
     D hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
     D hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java

     D hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
     D hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
     D hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java

     A hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
     A procedure-based AM (AMv2).

     TODO
      - handle region migration
      - handle meta assignment first
      - handle sys table assignment first (e.g. acl, namespace)
      - handle table priorities
       "hbase.assignment.bootstrap.thread.pool.size"; default size is 16.
       "hbase.assignment.dispatch.wait.msec"; default wait is 150
       "hbase.assignment.dispatch.wait.queue.max.size"; wait max default is 100
       "hbase.assignment.rit.chore.interval.msec"; default is 5 * 1000;
       "hbase.assignment.maximum.attempts"; default is 10;

     A hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
     Procedure that runs subprocedure to unassign and then assign to new location

     A hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
     Manage store of region state (in hbase:meta by default).

     A hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
     In-memory state of all regions. Used by AMv2.

     A hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
     Base RIT procedure for Assign and Unassign.

     A hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
     Unassign procedure.

     A hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
     Run region assignement in a manner that pays attention to target server version.
     Adds "hbase.regionserver.rpc.startup.waittime"; defaults 60 seconds.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8faab93a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8faab93a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8faab93a

Branch: refs/heads/HBASE-14614
Commit: 8faab93a9c5a25ea141d6d750aa31776ef078f66
Parents: f1c1f25
Author: Michael Stack <stack@apache.org>
Authored: Wed Mar 22 09:31:14 2017 -0700
Committer: Michael Stack <stack@apache.org>
Committed: Thu Mar 23 08:42:10 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ClusterStatus.java  |     8 +-
 .../apache/hadoop/hbase/MetaTableAccessor.java  |     2 +-
 .../hbase/client/ConnectionImplementation.java  |    12 +
 .../client/ShortCircuitMasterConnection.java    |    13 +-
 .../hbase/ipc/ServerTooBusyException.java       |     7 +-
 .../apache/hadoop/hbase/master/RegionState.java |    24 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |     2 +-
 .../hbase/shaded/protobuf/ProtobufUtil.java     |   125 +-
 .../hbase/shaded/protobuf/RequestConverter.java |    16 +-
 .../shaded/protobuf/ResponseConverter.java      |    13 -
 .../org/apache/hadoop/hbase/ChoreService.java   |     2 +-
 .../org/apache/hadoop/hbase/HConstants.java     |     2 +-
 .../java/org/apache/hadoop/hbase/TableName.java |     7 +-
 .../hadoop/hbase/zookeeper/TestZKConfig.java    |     1 +
 .../master/MetricsAssignmentManagerSource.java  |    23 +-
 .../MetricsAssignmentManagerSourceImpl.java     |    38 +-
 .../procedure2/AbstractProcedureScheduler.java  |     4 +-
 .../hadoop/hbase/procedure2/LockAndQueue.java   |    29 +-
 .../hadoop/hbase/procedure2/Procedure.java      |     6 +-
 .../hadoop/hbase/procedure2/ProcedureEvent.java |     6 +-
 .../hbase/procedure2/ProcedureExecutor.java     |    18 +-
 .../hbase/procedure2/ProcedureScheduler.java    |     4 +-
 .../procedure2/RemoteProcedureDispatcher.java   |   367 +
 .../hbase/procedure2/StateMachineProcedure.java |     3 +
 .../hbase/procedure2/util/DelayedUtil.java      |    58 +-
 .../procedure2/ProcedureTestingUtility.java     |    12 +-
 .../hbase/procedure2/util/TestDelayedUtil.java  |     2 +-
 .../shaded/protobuf/generated/AdminProtos.java  | 17559 ++++++++++-------
 .../generated/MasterProcedureProtos.java        |  7446 +++++--
 .../shaded/protobuf/generated/MasterProtos.java |  6937 +++++--
 .../generated/RegionServerStatusProtos.java     |  1633 +-
 .../src/main/protobuf/Admin.proto               |    47 +-
 .../src/main/protobuf/Master.proto              |    36 +
 .../src/main/protobuf/MasterProcedure.proto     |    95 +-
 .../src/main/protobuf/RegionServerStatus.proto  |    27 -
 .../hbase/rsgroup/RSGroupAdminServer.java       |    12 +-
 .../hbase/rsgroup/RSGroupBasedLoadBalancer.java |     2 +-
 .../balancer/TestRSGroupBasedLoadBalancer.java  |     2 +-
 .../master/AssignmentManagerStatusTmpl.jamon    |    51 +-
 .../hbase/tmpl/master/MasterStatusTmpl.jamon    |     2 +-
 .../hadoop/hbase/client/VersionInfoUtil.java    |    81 +-
 .../hbase/ipc/BalancedQueueRpcExecutor.java     |     3 -
 .../ipc/FastPathBalancedQueueRpcExecutor.java   |     2 +-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |    10 +-
 .../hadoop/hbase/ipc/SimpleRpcServer.java       |     4 +-
 .../hadoop/hbase/master/AssignCallable.java     |    49 -
 .../hadoop/hbase/master/AssignmentManager.java  |  3053 ---
 .../hadoop/hbase/master/BulkAssigner.java       |   122 -
 .../apache/hadoop/hbase/master/BulkReOpen.java  |   136 -
 .../hadoop/hbase/master/CatalogJanitor.java     |     6 +-
 .../hbase/master/GeneralBulkAssigner.java       |   213 -
 .../org/apache/hadoop/hbase/master/HMaster.java |   165 +-
 .../hadoop/hbase/master/LoadBalancer.java       |     4 +-
 .../hbase/master/MasterCoprocessorHost.java     |    22 +
 .../hadoop/hbase/master/MasterDumpServlet.java  |     8 +-
 .../hbase/master/MasterMetaBootstrap.java       |    43 +-
 .../hadoop/hbase/master/MasterRpcServices.java  |   259 +-
 .../hadoop/hbase/master/MasterServices.java     |    21 +
 .../hadoop/hbase/master/MasterWalManager.java   |    11 +-
 .../hbase/master/MetricsAssignmentManager.java  |    39 +-
 .../hadoop/hbase/master/RegionStateStore.java   |   268 -
 .../hadoop/hbase/master/RegionStates.java       |  1170 --
 .../hadoop/hbase/master/ServerManager.java      |    75 +-
 .../hbase/master/TableNamespaceManager.java     |     5 +-
 .../hadoop/hbase/master/UnAssignCallable.java   |    47 -
 .../master/assignment/AssignProcedure.java      |   270 +
 .../master/assignment/AssignmentManager.java    |  1660 ++
 .../assignment/MergeTableRegionsProcedure.java  |   717 +
 .../master/assignment/MoveRegionProcedure.java  |   147 +
 .../master/assignment/RegionStateStore.java     |   327 +
 .../hbase/master/assignment/RegionStates.java   |   864 +
 .../assignment/RegionTransitionProcedure.java   |   315 +
 .../assignment/SplitTableRegionProcedure.java   |   731 +
 .../master/assignment/UnassignProcedure.java    |   216 +
 .../hbase/master/balancer/BaseLoadBalancer.java |     2 +-
 .../master/balancer/RegionLocationFinder.java   |    14 +-
 .../master/balancer/SimpleLoadBalancer.java     |     9 +-
 .../master/balancer/StochasticLoadBalancer.java |    17 +-
 .../hbase/master/locking/LockProcedure.java     |    36 +-
 .../AbstractStateMachineTableProcedure.java     |     2 +-
 .../procedure/AddColumnFamilyProcedure.java     |    31 +-
 .../procedure/CloneSnapshotProcedure.java       |     4 +-
 .../procedure/CreateNamespaceProcedure.java     |     1 -
 .../master/procedure/CreateTableProcedure.java  |    41 +-
 .../procedure/DeleteColumnFamilyProcedure.java  |    31 +-
 .../master/procedure/DeleteTableProcedure.java  |    10 +-
 .../master/procedure/DisableTableProcedure.java |   154 +-
 .../DispatchMergingRegionsProcedure.java        |   584 +
 .../master/procedure/EnableTableProcedure.java  |   172 +-
 .../procedure/MasterDDLOperationHelper.java     |    93 +-
 .../procedure/MasterProcedureConstants.java     |     2 +-
 .../master/procedure/MasterProcedureEnv.java    |    28 +-
 .../procedure/MasterProcedureScheduler.java     |    95 +-
 .../procedure/MergeTableRegionsProcedure.java   |   909 -
 .../procedure/ModifyColumnFamilyProcedure.java  |    30 +-
 .../master/procedure/ModifyTableProcedure.java  |    30 +-
 .../master/procedure/ProcedureSyncWait.java     |   144 +-
 .../master/procedure/RSProcedureDispatcher.java |   542 +
 .../procedure/RestoreSnapshotProcedure.java     |    27 +-
 .../master/procedure/ServerCrashProcedure.java  |   492 +-
 .../procedure/SplitTableRegionProcedure.java    |   785 -
 .../procedure/TruncateTableProcedure.java       |     6 +-
 .../hadoop/hbase/quotas/MasterQuotaManager.java |    18 +-
 .../hadoop/hbase/regionserver/CompactSplit.java |   736 +
 .../hbase/regionserver/CompactSplitThread.java  |   722 -
 .../hbase/regionserver/HRegionServer.java       |    99 +-
 .../hbase/regionserver/RSRpcServices.java       |   102 +-
 .../hbase/regionserver/RegionMergeRequest.java  |   109 +
 .../regionserver/RegionServerServices.java      |    10 -
 .../hadoop/hbase/regionserver/SplitRequest.java |    85 +-
 .../org/apache/hadoop/hbase/util/HBaseFsck.java |     2 +-
 .../hadoop/hbase/util/ModifyRegionUtils.java    |    24 +-
 .../apache/hadoop/hbase/wal/WALSplitter.java    |     5 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |     7 +-
 .../hadoop/hbase/MockRegionServerServices.java  |    10 -
 .../hadoop/hbase/TestRegionRebalancing.java     |    13 +-
 .../hbase/TestStochasticBalancerJmxMetrics.java |     2 +-
 .../apache/hadoop/hbase/client/TestAdmin1.java  |    20 +-
 .../apache/hadoop/hbase/client/TestAdmin2.java  |     4 +-
 .../hadoop/hbase/client/TestEnableTable.java    |    34 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |   119 +-
 .../hbase/client/TestMetaWithReplicas.java      |     4 +-
 .../client/TestScannersFromClientSide.java      |    10 +-
 .../hbase/client/TestServerBusyException.java   |   234 +
 .../hbase/client/TestTableFavoredNodes.java     |     7 +-
 .../coprocessor/TestIncrementTimeRange.java     |     5 +-
 .../hbase/io/encoding/TestChangingEncoding.java |     8 +-
 .../hbase/ipc/TestSimpleRpcScheduler.java       |    13 +-
 .../hbase/master/MockNoopMasterServices.java    |    14 +-
 .../hadoop/hbase/master/MockRegionServer.java   |    37 +-
 .../hbase/master/TestAssignmentListener.java    |     1 +
 .../master/TestAssignmentManagerOnCluster.java  |  1402 --
 .../hadoop/hbase/master/TestCatalogJanitor.java |     1 +
 .../master/TestDistributedLogSplitting.java     |     1 +
 .../apache/hadoop/hbase/master/TestMaster.java  |     1 +
 .../master/TestMasterBalanceThrottling.java     |     9 +-
 .../hadoop/hbase/master/TestMasterFailover.java |    19 +-
 .../hbase/master/TestMasterStatusServlet.java   |     5 +-
 .../hbase/master/TestMetaShutdownHandler.java   |     1 +
 .../hadoop/hbase/master/TestRegionState.java    |    17 +-
 .../hadoop/hbase/master/TestRegionStates.java   |   144 -
 .../hadoop/hbase/master/TestRestartCluster.java |     1 +
 .../assignment/AssignmentTestingUtil.java       |   125 +
 .../master/assignment/MockMasterServices.java   |   201 +
 .../assignment/TestAssignmentManager.java       |   567 +
 .../assignment/TestAssignmentOnRSCrash.java     |   185 +
 .../TestMergeTableRegionsProcedure.java         |   239 +
 .../master/assignment/TestRegionStates.java     |   226 +
 .../TestSplitTableRegionProcedure.java          |   427 +
 .../MasterProcedureTestingUtility.java          |    67 +-
 .../procedure/TestAddColumnFamilyProcedure.java |    34 +-
 .../procedure/TestCloneSnapshotProcedure.java   |    10 +-
 .../procedure/TestCreateNamespaceProcedure.java |     4 +-
 .../procedure/TestCreateTableProcedure.java     |    46 +-
 .../TestDeleteColumnFamilyProcedure.java        |    31 +-
 .../procedure/TestDeleteNamespaceProcedure.java |     4 +-
 .../procedure/TestDeleteTableProcedure.java     |    21 +-
 .../procedure/TestDisableTableProcedure.java    |    24 +-
 .../procedure/TestEnableTableProcedure.java     |    24 +-
 .../TestMasterFailoverWithProcedures.java       |    23 +-
 .../procedure/TestMasterProcedureEvents.java    |     2 +-
 .../TestModifyColumnFamilyProcedure.java        |     9 +-
 .../procedure/TestModifyNamespaceProcedure.java |     4 +-
 .../procedure/TestModifyTableProcedure.java     |    18 +-
 .../master/procedure/TestProcedureAdmin.java    |    13 +-
 .../procedure/TestRestoreSnapshotProcedure.java |    13 +-
 .../procedure/TestServerCrashProcedure.java     |   110 +-
 .../TestSplitTableRegionProcedure.java          |   420 -
 .../procedure/TestTableDDLProcedureBase.java    |     7 +-
 .../procedure/TestTruncateTableProcedure.java   |    11 +-
 .../hbase/namespace/TestNamespaceAuditor.java   |     4 +-
 .../procedure/SimpleMasterProcedureManager.java |     2 +-
 .../regionserver/TestCompactSplitThread.java    |    24 +-
 .../hbase/regionserver/TestCompaction.java      |    10 +-
 .../regionserver/TestHRegionFileSystem.java     |     6 +-
 .../TestRegionMergeTransactionOnCluster.java    |    14 +-
 .../TestSplitTransactionOnCluster.java          |    76 +-
 .../regionserver/wal/TestAsyncLogRolling.java   |     9 +
 .../hbase/regionserver/wal/TestLogRolling.java  |     5 +
 .../wal/TestSecureAsyncWALReplay.java           |     5 +
 .../hbase/regionserver/wal/TestWALReplay.java   |     5 +
 .../hadoop/hbase/util/BaseTestHBaseFsck.java    |     4 +-
 .../hadoop/hbase/util/TestHBaseFsckMOB.java     |     2 +-
 .../hadoop/hbase/util/TestHBaseFsckOneRS.java   |    75 +-
 .../hbase/util/TestHBaseFsckReplicas.java       |     2 +-
 .../hadoop/hbase/util/TestHBaseFsckTwoRS.java   |    21 +-
 .../src/test/resources/log4j.properties         |     1 +
 187 files changed, 32709 insertions(+), 24483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
index f00016d..a7a26a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
@@ -24,8 +24,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -81,7 +81,7 @@ public class ClusterStatus extends VersionedWritable {
   private Collection<ServerName> deadServers;
   private ServerName master;
   private Collection<ServerName> backupMasters;
-  private Set<RegionState> intransition;
+  private List<RegionState> intransition;
   private String clusterId;
   private String[] masterCoprocessors;
   private Boolean balancerOn;
@@ -91,7 +91,7 @@ public class ClusterStatus extends VersionedWritable {
       final Collection<ServerName> deadServers,
       final ServerName master,
       final Collection<ServerName> backupMasters,
-      final Set<RegionState> rit,
+      final List<RegionState> rit,
       final String[] masterCoprocessors,
       final Boolean balancerOn) {
     this.hbaseVersion = hbaseVersion;
@@ -262,7 +262,7 @@ public class ClusterStatus extends VersionedWritable {
   }
 
   @InterfaceAudience.Private
-  public Set<RegionState> getRegionsInTransition() {
+  public List<RegionState> getRegionsInTransition() {
     return this.intransition;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index ee8d5fd..15bc132 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -2049,7 +2049,7 @@ public class MetaTableAccessor {
       + Bytes.toStringBinary(HConstants.MERGEB_QUALIFIER));
   }
 
-  private static Put addRegionInfo(final Put p, final HRegionInfo hri)
+  public static Put addRegionInfo(final Put p, final HRegionInfo hri)
     throws IOException {
     p.addImmutable(getCatalogFamily(), HConstants.REGIONINFO_QUALIFIER,
       hri.toByteArray());

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index adf1496..135946f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1316,6 +1316,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
         return stub.mergeTableRegions(controller, request);
       }
 
+      public MasterProtos.DispatchMergingRegionsResponse dispatchMergingRegions(
+          RpcController controller, MasterProtos.DispatchMergingRegionsRequest request)
+          throws ServiceException {
+        return stub.dispatchMergingRegions(controller, request);
+      }
+
       @Override
       public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
           MasterProtos.AssignRegionRequest request) throws ServiceException {
@@ -1335,6 +1341,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       }
 
       @Override
+      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
+          MasterProtos.SplitTableRegionRequest request) throws ServiceException {
+        return stub.splitRegion(controller, request);
+      }
+
+      @Override
       public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
           MasterProtos.DeleteTableRequest request) throws ServiceException {
         return stub.deleteTable(controller, request);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
index d70c76f..3469782 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
@@ -480,4 +480,15 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
     return stub.listReplicationPeers(controller, request);
   }
 
-}
+  @Override
+  public SplitTableRegionResponse splitRegion(RpcController controller, SplitTableRegionRequest request)
+      throws ServiceException {
+    return stub.splitRegion(controller, request);
+  }
+
+  @Override
+  public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller,
+      DispatchMergingRegionsRequest request) throws ServiceException {
+    return stub.dispatchMergingRegions(controller, request);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
index c6ba030..0dd8e64 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
@@ -25,14 +25,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
- * Throw this in rpc call if there are too many pending requests for one region server
+ * Throw this in RPC call if there are too many pending requests for one region server
  */
+@SuppressWarnings("serial")
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class ServerTooBusyException extends DoNotRetryIOException {
-
   public ServerTooBusyException(InetSocketAddress address, long count) {
-    super("There are " + count + " concurrent rpc requests for " + address);
+    super("Busy Server! " + count + " concurrent RPCs against " + address);
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
index a930732..7116763 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
@@ -36,10 +36,8 @@ public class RegionState {
   @InterfaceStability.Evolving
   public enum State {
     OFFLINE,        // region is in an offline state
-    PENDING_OPEN,   // same as OPENING, to be removed
     OPENING,        // server has begun to open but not yet done
     OPEN,           // server opened region and updated meta
-    PENDING_CLOSE,  // same as CLOSING, to be removed
     CLOSING,        // server has begun to close but not yet done
     CLOSED,         // server closed region and updated meta
     SPLITTING,      // server started split of a region
@@ -64,18 +62,12 @@ public class RegionState {
       case OFFLINE:
         rs = ClusterStatusProtos.RegionState.State.OFFLINE;
         break;
-      case PENDING_OPEN:
-        rs = ClusterStatusProtos.RegionState.State.PENDING_OPEN;
-        break;
       case OPENING:
         rs = ClusterStatusProtos.RegionState.State.OPENING;
         break;
       case OPEN:
         rs = ClusterStatusProtos.RegionState.State.OPEN;
         break;
-      case PENDING_CLOSE:
-        rs = ClusterStatusProtos.RegionState.State.PENDING_CLOSE;
-        break;
       case CLOSING:
         rs = ClusterStatusProtos.RegionState.State.CLOSING;
         break;
@@ -124,8 +116,6 @@ public class RegionState {
         state = OFFLINE;
         break;
       case PENDING_OPEN:
-        state = PENDING_OPEN;
-        break;
       case OPENING:
         state = OPENING;
         break;
@@ -133,8 +123,6 @@ public class RegionState {
         state = OPEN;
         break;
       case PENDING_CLOSE:
-        state = PENDING_CLOSE;
-        break;
       case CLOSING:
         state = CLOSING;
         break;
@@ -166,7 +154,7 @@ public class RegionState {
         state = MERGING_NEW;
         break;
       default:
-        throw new IllegalStateException("");
+        throw new IllegalStateException("Unhandled state " + protoState);
       }
       return state;
     }
@@ -231,22 +219,16 @@ public class RegionState {
     this.ritDuration += (this.stamp - previousStamp);
   }
 
-  /**
-   * PENDING_CLOSE (to be removed) is the same as CLOSING
-   */
   public boolean isClosing() {
-    return state == State.PENDING_CLOSE || state == State.CLOSING;
+    return state == State.CLOSING;
   }
 
   public boolean isClosed() {
     return state == State.CLOSED;
   }
 
-  /**
-   * PENDING_OPEN (to be removed) is the same as OPENING
-   */
   public boolean isOpening() {
-    return state == State.PENDING_OPEN || state == State.OPENING;
+    return state == State.OPENING;
   }
 
   public boolean isOpened() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 4f68447..fcf2c34 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -1803,7 +1803,7 @@ public final class ProtobufUtil {
    * has a serialized {@link ServerName} in it.
    * @return Returns null if <code>data</code> is null else converts passed data
    * to a ServerName instance.
-   * @throws DeserializationException 
+   * @throws DeserializationException
    */
   public static ServerName toServerName(final byte [] data) throws DeserializationException {
     if (data == null || data.length <= 0) return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index f44979c..5cec10d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -20,19 +20,19 @@ package org.apache.hadoop.hbase.shaded.protobuf;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -83,11 +83,13 @@ import org.apache.hadoop.hbase.io.LimitInputStream;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
 import org.apache.hadoop.hbase.quotas.QuotaScope;
 import org.apache.hadoop.hbase.quotas.QuotaType;
 import org.apache.hadoop.hbase.quotas.ThrottleType;
 import org.apache.hadoop.hbase.replication.ReplicationLoadSink;
 import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.visibility.Authorizations;
 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
@@ -101,8 +103,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -149,7 +149,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -166,6 +165,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DynamicClassLoader;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -1814,33 +1814,6 @@ public final class ProtobufUtil {
   }
 
   /**
-   * A helper to close a region for split or merge
-   * using admin protocol.
-   *
-   * @param controller RPC controller
-   * @param admin Admin service
-   * @param server the RS that hosts the target region
-   * @param regionInfo the target region info
-   * @return true if the region is closed
-   * @throws IOException
-   */
-  public static boolean closeRegionForSplitOrMerge(
-      final RpcController controller,
-      final AdminService.BlockingInterface admin,
-      final ServerName server,
-      final HRegionInfo... regionInfo) throws IOException {
-    CloseRegionForSplitOrMergeRequest closeRegionForRequest =
-        ProtobufUtil.buildCloseRegionForSplitOrMergeRequest(server, regionInfo);
-    try {
-      CloseRegionForSplitOrMergeResponse response =
-          admin.closeRegionForSplitOrMerge(controller, closeRegionForRequest);
-      return ResponseConverter.isClosed(response);
-    } catch (ServiceException se) {
-      throw getRemoteException(se);
-    }
-  }
-
-  /**
    * A helper to warmup a region given a region name
    * using admin protocol
    *
@@ -1992,6 +1965,46 @@ public final class ProtobufUtil {
     }
   }
 
+  /**
+   * A helper to merge regions using admin protocol. Send request to
+   * regionserver.
+   * @param admin
+   * @param region_a
+   * @param region_b
+   * @param forcible true if do a compulsory merge, otherwise we will only merge
+   *          two adjacent regions
+   * @param user effective user
+   * @throws IOException
+   */
+  public static void mergeRegions(final RpcController controller,
+      final AdminService.BlockingInterface admin,
+      final HRegionInfo region_a, final HRegionInfo region_b,
+      final boolean forcible, final User user) throws IOException {
+    final MergeRegionsRequest request = ProtobufUtil.buildMergeRegionsRequest(
+        region_a.getRegionName(), region_b.getRegionName(),forcible);
+    if (user != null) {
+      try {
+        user.runAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            admin.mergeRegions(controller, request);
+            return null;
+          }
+        });
+      } catch (InterruptedException ie) {
+        InterruptedIOException iioe = new InterruptedIOException();
+        iioe.initCause(ie);
+        throw iioe;
+      }
+    } else {
+      try {
+        admin.mergeRegions(controller, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+  }
+
 // End helpers for Admin
 
   /*
@@ -3003,8 +3016,8 @@ public final class ProtobufUtil {
       backupMasters.add(ProtobufUtil.toServerName(sn));
     }
 
-    Set<RegionState> rit = null;
-    rit = new HashSet<>(proto.getRegionsInTransitionList().size());
+    List<RegionState> rit =
+      new ArrayList<>(proto.getRegionsInTransitionList().size());
     for (RegionInTransition region : proto.getRegionsInTransitionList()) {
       RegionState value = RegionState.convert(region.getRegionState());
       rit.add(value);
@@ -3163,26 +3176,6 @@ public final class ProtobufUtil {
   }
 
   /**
-   * Create a CloseRegionForSplitOrMergeRequest for given regions
-   *
-   * @param server the RS server that hosts the region
-   * @param regionsToClose the info of the regions to close
-   * @return a CloseRegionForSplitRequest
-   */
-  public static CloseRegionForSplitOrMergeRequest buildCloseRegionForSplitOrMergeRequest(
-      final ServerName server,
-      final HRegionInfo... regionsToClose) {
-    CloseRegionForSplitOrMergeRequest.Builder builder =
-        CloseRegionForSplitOrMergeRequest.newBuilder();
-    for(int i = 0; i < regionsToClose.length; i++) {
-        RegionSpecifier regionToClose = RequestConverter.buildRegionSpecifier(
-          RegionSpecifierType.REGION_NAME, regionsToClose[i].getRegionName());
-        builder.addRegion(regionToClose);
-    }
-    return builder.build();
-  }
-
-  /**
     * Create a CloseRegionRequest for a given encoded region name
     *
     * @param encodedRegionName the name of the region to close
@@ -3220,6 +3213,28 @@ public final class ProtobufUtil {
      return builder.build();
    }
 
+   /**
+    * Create a MergeRegionsRequest for the given regions
+    * @param regionA name of region a
+    * @param regionB name of region b
+    * @param forcible true if it is a compulsory merge
+    * @return a MergeRegionsRequest
+    */
+   public static MergeRegionsRequest buildMergeRegionsRequest(
+       final byte[] regionA, final byte[] regionB, final boolean forcible) {
+     MergeRegionsRequest.Builder builder = MergeRegionsRequest.newBuilder();
+     RegionSpecifier regionASpecifier = RequestConverter.buildRegionSpecifier(
+         RegionSpecifierType.REGION_NAME, regionA);
+     RegionSpecifier regionBSpecifier = RequestConverter.buildRegionSpecifier(
+         RegionSpecifierType.REGION_NAME, regionB);
+     builder.setRegionA(regionASpecifier);
+     builder.setRegionB(regionBSpecifier);
+     builder.setForcible(forcible);
+     // send the master's wall clock time as well, so that the RS can refer to it
+     builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
+     return builder.build();
+   }
+
   /**
    * Get a ServerName from the passed in data bytes.
    * @param data Data with a serialize server name in it; can handle the old style
@@ -3263,4 +3278,4 @@ public final class ProtobufUtil {
     int port = Addressing.parsePort(str);
     return ServerName.valueOf(hostname, port, -1L);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index a513d66..7b50c3f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -117,7 +117,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOr
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@@ -1113,19 +1112,6 @@ public final class RequestConverter {
     return builder.build();
   }
 
-  public static SplitTableRegionRequest buildSplitTableRegionRequest(
-      final HRegionInfo regionInfo,
-      final byte[] splitPoint,
-      final long nonceGroup,
-      final long nonce) {
-    SplitTableRegionRequest.Builder builder = SplitTableRegionRequest.newBuilder();
-    builder.setRegionInfo(HRegionInfo.convert(regionInfo));
-    builder.setSplitRow(UnsafeByteOperations.unsafeWrap(splitPoint));
-    builder.setNonceGroup(nonceGroup);
-    builder.setNonce(nonce);
-    return builder.build();
-  }
-
   /**
    * Create a protocol buffer AssignRegionRequest
    *
@@ -1508,7 +1494,7 @@ public final class RequestConverter {
   /**
    * Create a RegionOpenInfo based on given region info and version of offline node
    */
-  private static RegionOpenInfo buildRegionOpenInfo(
+  public static RegionOpenInfo buildRegionOpenInfo(
       final HRegionInfo region,
       final List<ServerName> favoredNodes, Boolean openForReplay) {
     RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index ecadbbc..c489628 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.SingleResponse;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
@@ -254,18 +253,6 @@ public final class ResponseConverter {
   }
 
   /**
-   * Check if the region is closed from a CloseRegionForSplitResponse
-   *
-   * @param proto the CloseRegionForSplitResponse
-   * @return the region close state
-   */
-  public static boolean isClosed
-      (final CloseRegionForSplitOrMergeResponse proto) {
-    if (proto == null || !proto.hasClosed()) return false;
-    return proto.getClosed();
-  }
-
-  /**
    * A utility to build a GetServerInfoResponse.
    *
    * @param serverName

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
index d4ec48e..19363d0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
@@ -248,7 +248,7 @@ public class ChoreService implements ChoreServicer {
    */
   static class ChoreServiceThreadFactory implements ThreadFactory {
     private final String threadPrefix;
-    private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
+    private final static String THREAD_NAME_SUFFIX = "_Chore_";
     private AtomicInteger threadNumber = new AtomicInteger(1);
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 609e9a5..3789f71 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -146,7 +146,7 @@ public final class HConstants {
   public static final int DEFAULT_HBASE_BALANCER_PERIOD = 300000;
 
   /** The name of the ensemble table */
-  public static final String ENSEMBLE_TABLE_NAME = "hbase:ensemble";
+  public static final TableName ENSEMBLE_TABLE_NAME = TableName.valueOf("hbase:ensemble");
 
   /** Config for pluggable region normalizer */
   public static final String HBASE_MASTER_NORMALIZER_CLASS =

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
index 9b9755b..cba03c0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
@@ -89,7 +89,12 @@ public final class TableName implements Comparable<TableName> {
   public static final String OLD_META_STR = ".META.";
   public static final String OLD_ROOT_STR = "-ROOT-";
 
-
+  /**
+   * @return True if <code>tn</code> is the hbase:meta table name.
+   */
+  public static boolean isMetaTableName(final TableName tn) {
+    return tn.equals(TableName.META_TABLE_NAME);
+  }
 
   /**
    * TableName for old -ROOT- table. It is used to read/process old WALs which have

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
index 216fe0c..8536ce2 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
index f6c9cb8..7e1f836 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
@@ -46,12 +46,10 @@ public interface MetricsAssignmentManagerSource extends BaseSource {
   String RIT_COUNT_OVER_THRESHOLD_NAME = "ritCountOverThreshold";
   String RIT_OLDEST_AGE_NAME = "ritOldestAge";
   String RIT_DURATION_NAME = "ritDuration";
-  String ASSIGN_TIME_NAME = "assign";
-  String BULK_ASSIGN_TIME_NAME = "bulkAssign";
-
-  void updateAssignmentTime(long time);
 
-  void updateBulkAssignTime(long time);
+  String OPERATION_COUNT_NAME = "operationCount";
+  String ASSIGN_TIME_NAME = "assign";
+  String UNASSIGN_TIME_NAME = "unassign";
 
   /**
    * Set the number of regions in transition.
@@ -75,4 +73,19 @@ public interface MetricsAssignmentManagerSource extends BaseSource {
   void setRITOldestAge(long age);
 
   void updateRitDuration(long duration);
+
+  /**
+   * Increment the count of assignment operation (assign/unassign).
+   */
+  void incrementOperationCounter();
+
+  /**
+   * Add the time took to perform the last assign operation
+   */
+  void updateAssignTime(long time);
+
+  /**
+   * Add the time took to perform the last unassign operation
+   */
+  void updateUnassignTime(long time);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
index ab504f5..722358d 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
 import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 
 @InterfaceAudience.Private
@@ -32,8 +33,10 @@ public class MetricsAssignmentManagerSourceImpl
   private MutableGaugeLong ritCountOverThresholdGauge;
   private MutableGaugeLong ritOldestAgeGauge;
   private MetricHistogram ritDurationHisto;
+
+  private MutableFastCounter operationCounter;
   private MetricHistogram assignTimeHisto;
-  private MetricHistogram bulkAssignTimeHisto;
+  private MetricHistogram unassignTimeHisto;
 
   public MetricsAssignmentManagerSourceImpl() {
     this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
@@ -49,31 +52,40 @@ public class MetricsAssignmentManagerSourceImpl
     ritGauge = metricsRegistry.newGauge(RIT_COUNT_NAME, "", 0l);
     ritCountOverThresholdGauge = metricsRegistry.newGauge(RIT_COUNT_OVER_THRESHOLD_NAME, "", 0l);
     ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, "", 0l);
-    assignTimeHisto = metricsRegistry.newTimeHistogram(ASSIGN_TIME_NAME);
-    bulkAssignTimeHisto = metricsRegistry.newTimeHistogram(BULK_ASSIGN_TIME_NAME);
+    operationCounter = metricsRegistry.getCounter(OPERATION_COUNT_NAME, 0l);
     ritDurationHisto = metricsRegistry.newTimeHistogram(RIT_DURATION_NAME);
+    assignTimeHisto = metricsRegistry.newTimeHistogram(ASSIGN_TIME_NAME);
+    unassignTimeHisto = metricsRegistry.newTimeHistogram(UNASSIGN_TIME_NAME);
   }
 
   @Override
-  public void updateAssignmentTime(long time) {
-    assignTimeHisto.add(time);
+  public void setRIT(final int ritCount) {
+    ritGauge.set(ritCount);
   }
 
   @Override
-  public void updateBulkAssignTime(long time) {
-    bulkAssignTimeHisto.add(time);
+  public void setRITCountOverThreshold(final int ritCount) {
+    ritCountOverThresholdGauge.set(ritCount);
   }
 
-  public void setRIT(int ritCount) {
-    ritGauge.set(ritCount);
+  @Override
+  public void setRITOldestAge(final long ritCount) {
+    ritOldestAgeGauge.set(ritCount);
   }
 
-  public void setRITCountOverThreshold(int ritCount) {
-    ritCountOverThresholdGauge.set(ritCount);
+  @Override
+  public void incrementOperationCounter() {
+    operationCounter.incr();
   }
 
-  public void setRITOldestAge(long ritCount) {
-    ritOldestAgeGauge.set(ritCount);
+  @Override
+  public void updateAssignTime(final long time) {
+    assignTimeHisto.add(time);
+  }
+
+  @Override
+  public void updateUnassignTime(final long time) {
+    unassignTimeHisto.add(time);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index 646bc1f..fc80c9c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -25,13 +25,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import com.google.common.annotations.VisibleForTesting;
 
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
   private static final Log LOG = LogFactory.getLog(AbstractProcedureScheduler.class);
-
   private final ReentrantLock schedLock = new ReentrantLock();
   private final Condition schedWaitCond = schedLock.newCondition();
   private boolean running = false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
index 19ba28c..e11c23c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
@@ -19,24 +19,25 @@
 package org.apache.hadoop.hbase.procedure2;
 
 /**
- * Locking for mutual exclusion between procedures. Only by procedure framework internally.
+ * Locking for mutual exclusion between procedures. Used only by procedure framework internally.
  * {@link LockAndQueue} has two purposes:
  * <ol>
- *   <li>Acquire/release exclusive/shared locks</li>
- *   <li>Maintain a list of procedures waiting for this lock<br>
- *      To do so, {@link LockAndQueue} extends {@link ProcedureDeque} class. Using inheritance over
- *      composition for this need is unusual, but the choice is motivated by million regions
- *      assignment case as it will reduce memory footprint and number of objects to be GCed.
+ *   <li>Acquire/release exclusive/shared locks.</li>
+ *   <li>Maintains a list of procedures waiting on this lock.
+ *      {@link LockAndQueue} extends {@link ProcedureDeque} class. Blocked Procedures are added
+ *      to our super Deque. Using inheritance over composition to keep the Deque of waiting
+ *      Procedures is unusual, but we do it this way because in certain cases, there will be
+ *      millions of regions. This layout uses less memory.
  * </ol>
  *
- * NOT thread-safe. Needs external concurrency control. For eg. Uses in MasterProcedureScheduler are
+ * <p>NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
  * guarded by schedLock().
  * <br>
  * There is no need of 'volatile' keyword for member variables because of memory synchronization
  * guarantees of locks (see 'Memory Synchronization',
  * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html)
  * <br>
- * We do not implement Lock interface because we need exclusive + shared locking, and also
+ * We do not implement Lock interface because we need exclusive and shared locking, and also
  * because try-lock functions require procedure id.
  * <br>
  * We do not use ReentrantReadWriteLock directly because of its high memory overhead.
@@ -104,6 +105,9 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus {
     return true;
   }
 
+  /**
+   * @return True if we released a lock.
+   */
   public boolean releaseExclusiveLock(final Procedure proc) {
     if (isLockOwner(proc.getProcId())) {
       exclusiveLockProcIdOwner = Long.MIN_VALUE;
@@ -111,4 +115,11 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus {
     }
     return false;
   }
-}
+
+  @Override
+  public String toString() {
+    return "exclusiveLockOwner=" + (hasExclusiveLock()? getExclusiveLockProcIdOwner(): "NONE") +
+      ", sharedLockCount=" + getSharedLockCount() +
+      ", waitingProcCount=" + size();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index fee5250..2841697 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -253,9 +253,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
    */
   protected StringBuilder toStringSimpleSB() {
     final StringBuilder sb = new StringBuilder();
-    toStringClassDetails(sb);
 
-    sb.append(", procId=");
+    sb.append("procId=");
     sb.append(getProcId());
 
     if (hasParent()) {
@@ -275,6 +274,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
       sb.append(", failed=" + getException());
     }
 
+    sb.append(", ");
+    toStringClassDetails(sb);
+
     return sb;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
index cb90ac0..43cce3a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.procedure2;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
  * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the
@@ -50,6 +49,7 @@ public class ProcedureEvent<T> {
 
   @Override
   public String toString() {
-    return getClass().getSimpleName() + "(" + object + ")";
+    return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() +
+        ", suspended procedures count=" + getSuspendedProcedures().size();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 0856aa2..3145e83 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -360,8 +360,7 @@ public class ProcedureExecutor<TEnvironment> {
       assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
 
       if (debugEnabled) {
-        LOG.debug(String.format("Loading state=%s isFailed=%s: %s",
-                    proc.getState(), proc.hasException(), proc));
+        LOG.debug(String.format("Loading %s", proc));
       }
 
       Long rootProcId = getRootProcedureId(proc);
@@ -483,7 +482,7 @@ public class ProcedureExecutor<TEnvironment> {
     // We have numThreads executor + one timer thread used for timing out
     // procedures and triggering periodic procedures.
     this.corePoolSize = numThreads;
-    LOG.info("Starting executor threads=" + corePoolSize);
+    LOG.info("Starting executor worker threads=" + corePoolSize);
 
     // Create the Thread Group for the executors
     threadGroup = new ThreadGroup("ProcedureExecutor");
@@ -522,7 +521,9 @@ public class ProcedureExecutor<TEnvironment> {
       store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
 
     // Start the executors. Here we must have the lastProcId set.
-    LOG.debug("Start workers " + workerThreads.size());
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Start workers " + workerThreads.size());
+    }
     timeoutExecutor.start();
     for (WorkerThread worker: workerThreads) {
       worker.start();
@@ -1147,8 +1148,7 @@ public class ProcedureExecutor<TEnvironment> {
 
       if (proc.isSuccess()) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Completed in " +
-              StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
+          LOG.debug("Completed " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
         }
         // Finalize the procedure state
         if (proc.getProcId() == rootProcId) {
@@ -1342,7 +1342,7 @@ public class ProcedureExecutor<TEnvironment> {
         return;
       } catch (Throwable e) {
         // Catch NullPointerExceptions or similar errors...
-        String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
+        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
         LOG.error(msg, e);
         procedure.setFailure(new RemoteProcedureException(msg, e));
       }
@@ -1674,7 +1674,7 @@ public class ProcedureExecutor<TEnvironment> {
         // if the procedure is in a waiting state again, put it back in the queue
         procedure.updateTimestamp();
         if (procedure.isWaiting()) {
-          delayed.setTimeoutTimestamp(procedure.getTimeoutTimestamp());
+          delayed.setTimeout(procedure.getTimeoutTimestamp());
           queue.add(delayed);
         }
       } else {
@@ -1752,7 +1752,7 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     @Override
-    public long getTimeoutTimestamp() {
+    public long getTimeout() {
       return timeout;
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index 16ff781..617532b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -23,13 +23,11 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
  * Keep track of the runnable procedures
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public interface ProcedureScheduler {
   /**
    * Start the scheduler
@@ -93,7 +91,7 @@ public interface ProcedureScheduler {
   Procedure poll(long timeout, TimeUnit unit);
 
   /**
-   * Mark the event has not ready.
+   * Mark the event as not ready.
    * procedures calling waitEvent() will be suspended.
    * @param event the event to mark as suspended/not ready
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
new file mode 100644
index 0000000..0e33887
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.collect.ArrayListMultimap;
+
+/**
+ * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
+ * count threshold. Creates its own threadpool to run RPCs with timeout.
+ * <ul>
+ * <li>Each server queue has a dispatch buffer</li>
+ * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
+ * </ul>
+ * <p>Call {@link #start()} and then {@link #submitTask(Callable)}. When done,
+ * call {@link #stop()}.
+ */
+@InterfaceAudience.Private
+public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
+  private static final Log LOG = LogFactory.getLog(RemoteProcedureDispatcher.class);
+
+  public static final String THREAD_POOL_SIZE_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.threadpool.size";
+  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
+
+  public static final String DISPATCH_DELAY_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.delay.msec";
+  private static final int DEFAULT_DISPATCH_DELAY = 150;
+
+  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.max.queue.size";
+  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
+
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
+      new ConcurrentHashMap<TRemote, BufferNode>();
+
+  private final int operationDelay;
+  private final int queueMaxSize;
+  private final int corePoolSize;
+
+  private TimeoutExecutorThread timeoutExecutor;
+  private ThreadPoolExecutor threadPool;
+
+  protected RemoteProcedureDispatcher(Configuration conf) {
+    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
+    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
+    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
+  }
+
+  public boolean start() {
+    if (running.getAndSet(true)) {
+      LOG.warn("Already running");
+      return false;
+    }
+
+    LOG.info("Starting procedure remote dispatcher; threads=" + this.corePoolSize +
+      ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" + this.operationDelay);
+
+    // Create the timeout executor
+    timeoutExecutor = new TimeoutExecutorThread();
+    timeoutExecutor.start();
+
+    // Create the thread pool that will execute RPCs
+    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
+      Threads.newDaemonThreadFactory("ProcedureRemoteDispatcher", getUncaughtExceptionHandler()));
+    return true;
+  }
+
+  public boolean stop() {
+    if (!running.getAndSet(false)) {
+      return false;
+    }
+
+    LOG.info("Stopping procedure remote dispatcher");
+
+    // send stop signals
+    timeoutExecutor.sendStopSignal();
+    threadPool.shutdownNow();
+    return true;
+  }
+
+  public void join() {
+    assert !running.get() : "expected not running";
+
+    // wait the timeout executor
+    timeoutExecutor.awaitTermination();
+    timeoutExecutor = null;
+
+    // wait for the thread pool to terminate
+    threadPool.shutdownNow();
+    try {
+      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        LOG.warn("Waiting for thread-pool to terminate");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for thread-pool termination", e);
+    }
+  }
+
+  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return new UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(Thread t, Throwable e) {
+        LOG.warn("Failed to execute remote procedures " + t.getName(), e);
+      }
+    };
+  }
+
+  // ============================================================================================
+  //  Node Helpers
+  // ============================================================================================
+  /**
+   * Add a node that will be able to execute remove procedures
+   * @param key the node identifier
+   */
+  public void addNode(final TRemote key) {
+    final BufferNode newNode = new BufferNode(key);
+    nodeMap.putIfAbsent(key, newNode);
+  }
+
+  /**
+   * Get the remote node that will execute remote procedures
+   * @param key the node identifier
+   */
+  public RemoteNode getNode(final TRemote key) {
+    assert key != null : "found null key for node";
+    return nodeMap.get(key);
+  }
+
+  /**
+   * Remove a remote node
+   * @param key the node identifier
+   */
+  public boolean removeNode(final TRemote key) {
+    final BufferNode node = nodeMap.remove(key);
+    if (node == null) return false;
+    node.abortOperationsInQueue();
+    return true;
+  }
+
+  // ============================================================================================
+  //  Task Helpers
+  // ============================================================================================
+  protected Future<Void> submitTask(Callable<Void> task) {
+    return threadPool.submit(task);
+  }
+
+  protected Future<Void> submitTask(Callable<Void> task, long delay, TimeUnit unit) {
+    final FutureTask<Void> futureTask = new FutureTask(task);
+    timeoutExecutor.add(new DelayedTask(futureTask, delay, unit));
+    return futureTask;
+  }
+
+  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
+  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
+
+  /**
+   * Data structure with reference to remote operation.
+   */
+  public static abstract class RemoteOperation {
+    private final RemoteProcedure remoteProcedure;
+
+    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
+      this.remoteProcedure = remoteProcedure;
+    }
+
+    public RemoteProcedure getRemoteProcedure() {
+      return remoteProcedure;
+    }
+  }
+
+  /**
+   * Remote procedure reference.
+   * @param <TEnv>
+   * @param <TRemote>
+   */
+  public interface RemoteProcedure<TEnv, TRemote> {
+    RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
+    void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response);
+    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
+  }
+
+  /**
+   * Account of what procedures are running on remote node.
+   * @param <TEnv>
+   * @param <TRemote>
+   */
+  public interface RemoteNode<TEnv, TRemote> {
+    TRemote getKey();
+    void add(RemoteProcedure<TEnv, TRemote> operation);
+    void dispatch();
+  }
+
+  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
+      final TRemote remote, final Set<RemoteProcedure> operations) {
+    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
+    for (RemoteProcedure proc: operations) {
+      RemoteOperation operation = proc.remoteCallBuild(env, remote);
+      requestByType.put(operation.getClass(), operation);
+    }
+    return requestByType;
+  }
+
+  protected <T extends RemoteOperation> List<T> fetchType(
+      final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
+    return (List<T>)requestByType.removeAll(type);
+  }
+
+  // ============================================================================================
+  //  Timeout Helpers
+  // ============================================================================================
+  private final class TimeoutExecutorThread extends Thread {
+    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
+
+    public TimeoutExecutorThread() {
+      super("ProcedureDispatcherTimeoutThread");
+    }
+
+    @Override
+    public void run() {
+      while (running.get()) {
+        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
+        if (task == null || task == DelayedUtil.DELAYED_POISON) {
+          // the executor may be shutting down, and the task is just the shutdown request
+          continue;
+        }
+
+        if (task instanceof DelayedTask) {
+          threadPool.execute(((DelayedTask)task).getObject());
+        } else {
+          ((BufferNode)task).dispatch();
+        }
+      }
+    }
+
+    public void add(final DelayedWithTimeout delayed) {
+      queue.add(delayed);
+    }
+
+    public void remove(final DelayedWithTimeout delayed) {
+      queue.remove(delayed);
+    }
+
+    public void sendStopSignal() {
+      queue.add(DelayedUtil.DELAYED_POISON);
+    }
+
+    public void awaitTermination() {
+      try {
+        final long startTime = EnvironmentEdgeManager.currentTime();
+        for (int i = 0; isAlive(); ++i) {
+          sendStopSignal();
+          join(250);
+          if (i > 0 && (i % 8) == 0) {
+            LOG.warn("Waiting termination of thread " + getName() + ", " +
+              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.warn(getName() + " join wait got interrupted", e);
+      }
+    }
+  }
+
+  // ============================================================================================
+  //  Internals Helpers
+  // ============================================================================================
+
+  /**
+   * Node that contains a set of RemoteProcedures
+   */
+  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
+      implements RemoteNode<TEnv, TRemote> {
+    private Set<RemoteProcedure> operations;
+
+    protected BufferNode(final TRemote key) {
+      super(key, 0);
+    }
+
+    public TRemote getKey() {
+      return getObject();
+    }
+
+    public synchronized void add(final RemoteProcedure operation) {
+      if (this.operations == null) {
+        this.operations = new HashSet<>();
+        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
+        timeoutExecutor.add(this);
+      }
+      this.operations.add(operation);
+      if (this.operations.size() > queueMaxSize) {
+        timeoutExecutor.remove(this);
+        dispatch();
+      }
+    }
+
+    public synchronized void dispatch() {
+      if (operations != null) {
+        remoteDispatch(getKey(), operations);
+        this.operations = null;
+      }
+    }
+
+    public synchronized void abortOperationsInQueue() {
+      if (operations != null) {
+        abortPendingOperations(getKey(), operations);
+        this.operations = null;
+      }
+    }
+
+    @Override
+    public String toString() {
+      return super.toString() + ", operations=" + this.operations;
+    }
+  }
+
+  /**
+   * Delayed object that holds a FutureTask.
+   * used to submit something later to the thread-pool.
+   */
+  private static final class DelayedTask extends DelayedContainerWithTimestamp<FutureTask<Void>> {
+    public DelayedTask(final FutureTask<Void> task, final long delay, final TimeUnit unit) {
+      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index 5c3a4c7..ea2a41f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -108,6 +108,9 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
     if (aborted.get() && isRollbackSupported(getCurrentState())) {
       setAbortFailure(getClass().getSimpleName(), "abort requested");
     } else {
+      if (aborted.get()) {
+        LOG.warn("ignoring abort request " + state);
+      }
       setNextState(getStateId(state));
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
index ea34c49..3719019 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
@@ -32,13 +32,19 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 public final class DelayedUtil {
   private DelayedUtil() { }
 
+  /**
+   * Add a timeout to a Delay
+   */
   public interface DelayedWithTimeout extends Delayed {
-    long getTimeoutTimestamp();
+    long getTimeout();
   }
 
+  /**
+   * POISON implementation; used to mark special state: e.g. shutdown.
+   */
   public static final DelayedWithTimeout DELAYED_POISON = new DelayedWithTimeout() {
     @Override
-    public long getTimeoutTimestamp() {
+    public long getTimeout() {
       return 0;
     }
 
@@ -49,7 +55,7 @@ public final class DelayedUtil {
 
     @Override
     public int compareTo(final Delayed o) {
-      return Long.compare(0, DelayedUtil.getTimeoutTimestamp(o));
+      return Long.compare(0, DelayedUtil.getTimeout(o));
     }
 
     @Override
@@ -59,10 +65,13 @@ public final class DelayedUtil {
 
     @Override
     public String toString() {
-      return getClass().getSimpleName() + "(POISON)";
+      return "POISON";
     }
   };
 
+  /**
+   * @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
+   */
   public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
     try {
       return queue.take();
@@ -72,33 +81,42 @@ public final class DelayedUtil {
     }
   }
 
-  public static long getRemainingTime(final TimeUnit resultUnit, final long timeoutTime) {
+  /**
+   * @return Time remaining as milliseconds.
+   */
+  public static long getRemainingTime(final TimeUnit resultUnit, final long timeout) {
     final long currentTime = EnvironmentEdgeManager.currentTime();
-    if (currentTime >= timeoutTime) {
+    if (currentTime >= timeout) {
       return 0;
     }
-    return resultUnit.convert(timeoutTime - currentTime, TimeUnit.MILLISECONDS);
+    return resultUnit.convert(timeout - currentTime, TimeUnit.MILLISECONDS);
   }
 
   public static int compareDelayed(final Delayed o1, final Delayed o2) {
-    return Long.compare(getTimeoutTimestamp(o1), getTimeoutTimestamp(o2));
+    return Long.compare(getTimeout(o1), getTimeout(o2));
   }
 
-  private static long getTimeoutTimestamp(final Delayed o) {
+  private static long getTimeout(final Delayed o) {
     assert o instanceof DelayedWithTimeout : "expected DelayedWithTimeout instance, got " + o;
-    return ((DelayedWithTimeout)o).getTimeoutTimestamp();
+    return ((DelayedWithTimeout)o).getTimeout();
   }
 
   public static abstract class DelayedObject implements DelayedWithTimeout {
     @Override
     public long getDelay(final TimeUnit unit) {
-      return DelayedUtil.getRemainingTime(unit, getTimeoutTimestamp());
+      return DelayedUtil.getRemainingTime(unit, getTimeout());
     }
 
     @Override
     public int compareTo(final Delayed other) {
       return DelayedUtil.compareDelayed(this, other);
     }
+
+    @Override
+    public String toString() {
+      long timeout = getTimeout();
+      return "timeout=" + timeout + ", delay=" + getDelay(TimeUnit.MILLISECONDS);
+    }
   }
 
   public static abstract class DelayedContainer<T> extends DelayedObject {
@@ -126,25 +144,25 @@ public final class DelayedUtil {
 
     @Override
     public String toString() {
-      return getClass().getSimpleName() + "(" + getObject() + ")";
+      return "containedObject=" + getObject() + ", " + super.toString();
     }
   }
 
   public static class DelayedContainerWithTimestamp<T> extends DelayedContainer<T> {
-    private long timeoutTimestamp;
+    private long timeout;
 
-    public DelayedContainerWithTimestamp(final T object, final long timeoutTimestamp) {
+    public DelayedContainerWithTimestamp(final T object, final long timeout) {
       super(object);
-      setTimeoutTimestamp(timeoutTimestamp);
+      setTimeout(timeout);
     }
 
     @Override
-    public long getTimeoutTimestamp() {
-      return timeoutTimestamp;
+    public long getTimeout() {
+      return timeout;
     }
 
-    public void setTimeoutTimestamp(final long timeoutTimestamp) {
-      this.timeoutTimestamp = timeoutTimestamp;
+    public void setTimeout(final long timeout) {
+      this.timeout = timeout;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 226666f..0240465 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -18,12 +18,16 @@
 
 package org.apache.hadoop.hbase.procedure2;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.Callable;
 import java.util.ArrayList;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,18 +39,14 @@ import org.apache.hadoop.hbase.ProcedureInfo;
 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
-import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.util.NonceKey;
 import org.apache.hadoop.hbase.util.Threads;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 public class ProcedureTestingUtility {
   private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
index a2cd70f..019b456 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
@@ -80,7 +80,7 @@ public class TestDelayedUtil {
     }
 
     @Override
-    public long getTimeoutTimestamp() {
+    public long getTimeout() {
       return 0;
     }
   }


Mime
View raw message