hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [23/24] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility.
Date Sun, 30 Apr 2017 22:14:33 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
index c51a437..95d77a2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
@@ -22,7 +22,7 @@ package org.apache.hadoop.hbase;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Set;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -67,7 +67,7 @@ public class ClusterStatus extends VersionedWritable {
   private Collection<ServerName> deadServers;
   private ServerName master;
   private Collection<ServerName> backupMasters;
-  private Set<RegionState> intransition;
+  private List<RegionState> intransition;
   private String clusterId;
   private String[] masterCoprocessors;
   private Boolean balancerOn;
@@ -77,7 +77,7 @@ public class ClusterStatus extends VersionedWritable {
       final Collection<ServerName> deadServers,
       final ServerName master,
       final Collection<ServerName> backupMasters,
-      final Set<RegionState> rit,
+      final List<RegionState> rit,
       final String[] masterCoprocessors,
       final Boolean balancerOn) {
     this.hbaseVersion = hbaseVersion;
@@ -248,7 +248,7 @@ public class ClusterStatus extends VersionedWritable {
   }
 
   @InterfaceAudience.Private
-  public Set<RegionState> getRegionsInTransition() {
+  public List<RegionState> getRegionsInTransition() {
     return this.intransition;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
index bc93cc6..5b9cbec 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -168,6 +169,15 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
   }
 
   /**
+   * @return Return a String of short, printable names for <code>hris</code>
+   * (usually encoded name) for us logging.
+   */
+  public static String getShortNameToLog(final List<HRegionInfo> hris) {
+    return hris.stream().map(hri -> hri.getShortNameToLog()).
+        collect(Collectors.toList()).toString();
+  }
+
+  /**
    * Use logging.
    * @param encodedRegionName The encoded regionname.
    * @return <code>hbase:meta</code> if passed <code>1028785192</code> else returns

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 15bc132..9f1be9f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -1966,8 +1966,8 @@ public class MetaTableAccessor {
    * @param regionsInfo list of regions to be deleted from META
    * @throws IOException
    */
-  public static void deleteRegions(Connection connection,
-                                   List<HRegionInfo> regionsInfo, long ts) throws IOException {
+  public static void deleteRegions(Connection connection, List<HRegionInfo> regionsInfo, long ts)
+  throws IOException {
     List<Delete> deletes = new ArrayList<>(regionsInfo.size());
     for (HRegionInfo hri: regionsInfo) {
       Delete e = new Delete(hri.getRegionName());
@@ -2002,10 +2002,10 @@ public class MetaTableAccessor {
     }
     mutateMetaTable(connection, mutation);
     if (regionsToRemove != null && regionsToRemove.size() > 0) {
-      LOG.debug("Deleted " + regionsToRemove);
+      LOG.debug("Deleted " + HRegionInfo.getShortNameToLog(regionsToRemove));
     }
     if (regionsToAdd != null && regionsToAdd.size() > 0) {
-      LOG.debug("Added " + regionsToAdd);
+      LOG.debug("Added " + HRegionInfo.getShortNameToLog(regionsToAdd));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 6859cb3..9b3f1a2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1323,6 +1323,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
         return stub.mergeTableRegions(controller, request);
       }
 
+      public MasterProtos.DispatchMergingRegionsResponse dispatchMergingRegions(
+          RpcController controller, MasterProtos.DispatchMergingRegionsRequest request)
+          throws ServiceException {
+        return stub.dispatchMergingRegions(controller, request);
+      }
+
       @Override
       public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
           MasterProtos.AssignRegionRequest request) throws ServiceException {
@@ -1342,6 +1348,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       }
 
       @Override
+      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
+          MasterProtos.SplitTableRegionRequest request) throws ServiceException {
+        return stub.splitRegion(controller, request);
+      }
+
+      @Override
       public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
           MasterProtos.DeleteTableRequest request) throws ServiceException {
         return stub.deleteTable(controller, request);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
index e3b5b12..648fdca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
@@ -484,4 +484,15 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
     return stub.listReplicationPeers(controller, request);
   }
 
-}
+  @Override
+  public SplitTableRegionResponse splitRegion(RpcController controller, SplitTableRegionRequest request)
+      throws ServiceException {
+    return stub.splitRegion(controller, request);
+  }
+
+  @Override
+  public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller,
+      DispatchMergingRegionsRequest request) throws ServiceException {
+    return stub.dispatchMergingRegions(controller, request);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index e69b42d..08533b4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -226,8 +226,8 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
       switch (idleEvt.state()) {
         case WRITER_IDLE:
           if (id2Call.isEmpty()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("shutdown connection to " + conn.remoteId().address
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("shutdown connection to " + conn.remoteId().address
                   + " because idle for a long time");
             }
             // It may happen that there are still some pending calls in the event loop queue and

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index b5a7959..98d2256 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -129,7 +129,11 @@ abstract class RpcConnection {
       authMethod = AuthMethod.KERBEROS;
     }
 
-    if (LOG.isDebugEnabled()) {
+    // Log if debug AND non-default auth, else if trace enabled.
+    // No point logging obvious.
+    if ((LOG.isDebugEnabled() && !authMethod.equals(AuthMethod.SIMPLE)) ||
+        LOG.isTraceEnabled()) {
+      // Only log if not default auth.
       LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName
           + ", sasl=" + useSasl);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
index 0e12ef6..7116763 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
@@ -36,10 +36,8 @@ public class RegionState {
   @InterfaceStability.Evolving
   public enum State {
     OFFLINE,        // region is in an offline state
-    PENDING_OPEN,   // same as OPENING, to be removed
     OPENING,        // server has begun to open but not yet done
     OPEN,           // server opened region and updated meta
-    PENDING_CLOSE,  // same as CLOSING, to be removed
     CLOSING,        // server has begun to close but not yet done
     CLOSED,         // server closed region and updated meta
     SPLITTING,      // server started split of a region
@@ -64,18 +62,12 @@ public class RegionState {
       case OFFLINE:
         rs = ClusterStatusProtos.RegionState.State.OFFLINE;
         break;
-      case PENDING_OPEN:
-        rs = ClusterStatusProtos.RegionState.State.PENDING_OPEN;
-        break;
       case OPENING:
         rs = ClusterStatusProtos.RegionState.State.OPENING;
         break;
       case OPEN:
         rs = ClusterStatusProtos.RegionState.State.OPEN;
         break;
-      case PENDING_CLOSE:
-        rs = ClusterStatusProtos.RegionState.State.PENDING_CLOSE;
-        break;
       case CLOSING:
         rs = ClusterStatusProtos.RegionState.State.CLOSING;
         break;
@@ -124,8 +116,6 @@ public class RegionState {
         state = OFFLINE;
         break;
       case PENDING_OPEN:
-        state = PENDING_OPEN;
-        break;
       case OPENING:
         state = OPENING;
         break;
@@ -133,8 +123,6 @@ public class RegionState {
         state = OPEN;
         break;
       case PENDING_CLOSE:
-        state = PENDING_CLOSE;
-        break;
       case CLOSING:
         state = CLOSING;
         break;
@@ -231,22 +219,16 @@ public class RegionState {
     this.ritDuration += (this.stamp - previousStamp);
   }
 
-  /**
-   * PENDING_CLOSE (to be removed) is the same as CLOSING
-   */
   public boolean isClosing() {
-    return state == State.PENDING_CLOSE || state == State.CLOSING;
+    return state == State.CLOSING;
   }
 
   public boolean isClosed() {
     return state == State.CLOSED;
   }
 
-  /**
-   * PENDING_OPEN (to be removed) is the same as OPENING
-   */
   public boolean isOpening() {
-    return state == State.PENDING_OPEN || state == State.OPENING;
+    return state == State.OPENING;
   }
 
   public boolean isOpened() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index d4c4231..2bf386a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -20,19 +20,19 @@ package org.apache.hadoop.hbase.shaded.protobuf;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -90,11 +90,13 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.procedure2.LockInfo;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
 import org.apache.hadoop.hbase.quotas.QuotaScope;
 import org.apache.hadoop.hbase.quotas.QuotaType;
 import org.apache.hadoop.hbase.quotas.ThrottleType;
 import org.apache.hadoop.hbase.replication.ReplicationLoadSink;
 import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.visibility.Authorizations;
 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
@@ -108,8 +110,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -175,6 +175,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DynamicClassLoader;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Methods;
@@ -1840,33 +1841,6 @@ public final class ProtobufUtil {
   }
 
   /**
-   * A helper to close a region for split or merge
-   * using admin protocol.
-   *
-   * @param controller RPC controller
-   * @param admin Admin service
-   * @param server the RS that hosts the target region
-   * @param regionInfo the target region info
-   * @return true if the region is closed
-   * @throws IOException
-   */
-  public static boolean closeRegionForSplitOrMerge(
-      final RpcController controller,
-      final AdminService.BlockingInterface admin,
-      final ServerName server,
-      final HRegionInfo... regionInfo) throws IOException {
-    CloseRegionForSplitOrMergeRequest closeRegionForRequest =
-        ProtobufUtil.buildCloseRegionForSplitOrMergeRequest(server, regionInfo);
-    try {
-      CloseRegionForSplitOrMergeResponse response =
-          admin.closeRegionForSplitOrMerge(controller, closeRegionForRequest);
-      return ResponseConverter.isClosed(response);
-    } catch (ServiceException se) {
-      throw getRemoteException(se);
-    }
-  }
-
-  /**
    * A helper to warmup a region given a region name
    * using admin protocol
    *
@@ -2018,6 +1992,46 @@ public final class ProtobufUtil {
     }
   }
 
+  /**
+   * A helper to merge regions using admin protocol. Send request to
+   * regionserver.
+   * @param admin
+   * @param region_a
+   * @param region_b
+   * @param forcible true if do a compulsory merge, otherwise we will only merge
+   *          two adjacent regions
+   * @param user effective user
+   * @throws IOException
+   */
+  public static void mergeRegions(final RpcController controller,
+      final AdminService.BlockingInterface admin,
+      final HRegionInfo region_a, final HRegionInfo region_b,
+      final boolean forcible, final User user) throws IOException {
+    final MergeRegionsRequest request = ProtobufUtil.buildMergeRegionsRequest(
+        region_a.getRegionName(), region_b.getRegionName(),forcible);
+    if (user != null) {
+      try {
+        user.runAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            admin.mergeRegions(controller, request);
+            return null;
+          }
+        });
+      } catch (InterruptedException ie) {
+        InterruptedIOException iioe = new InterruptedIOException();
+        iioe.initCause(ie);
+        throw iioe;
+      }
+    } else {
+      try {
+        admin.mergeRegions(controller, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+  }
+
 // End helpers for Admin
 
   /*
@@ -3050,8 +3064,8 @@ public final class ProtobufUtil {
       backupMasters.add(ProtobufUtil.toServerName(sn));
     }
 
-    Set<RegionState> rit = null;
-    rit = new HashSet<>(proto.getRegionsInTransitionList().size());
+    List<RegionState> rit =
+      new ArrayList<>(proto.getRegionsInTransitionList().size());
     for (RegionInTransition region : proto.getRegionsInTransitionList()) {
       RegionState value = RegionState.convert(region.getRegionState());
       rit.add(value);
@@ -3210,26 +3224,6 @@ public final class ProtobufUtil {
   }
 
   /**
-   * Create a CloseRegionForSplitOrMergeRequest for given regions
-   *
-   * @param server the RS server that hosts the region
-   * @param regionsToClose the info of the regions to close
-   * @return a CloseRegionForSplitRequest
-   */
-  public static CloseRegionForSplitOrMergeRequest buildCloseRegionForSplitOrMergeRequest(
-      final ServerName server,
-      final HRegionInfo... regionsToClose) {
-    CloseRegionForSplitOrMergeRequest.Builder builder =
-        CloseRegionForSplitOrMergeRequest.newBuilder();
-    for(int i = 0; i < regionsToClose.length; i++) {
-        RegionSpecifier regionToClose = RequestConverter.buildRegionSpecifier(
-          RegionSpecifierType.REGION_NAME, regionsToClose[i].getRegionName());
-        builder.addRegion(regionToClose);
-    }
-    return builder.build();
-  }
-
-  /**
     * Create a CloseRegionRequest for a given encoded region name
     *
     * @param encodedRegionName the name of the region to close
@@ -3267,6 +3261,28 @@ public final class ProtobufUtil {
      return builder.build();
    }
 
+   /**
+    * Create a MergeRegionsRequest for the given regions
+    * @param regionA name of region a
+    * @param regionB name of region b
+    * @param forcible true if it is a compulsory merge
+    * @return a MergeRegionsRequest
+    */
+   public static MergeRegionsRequest buildMergeRegionsRequest(
+       final byte[] regionA, final byte[] regionB, final boolean forcible) {
+     MergeRegionsRequest.Builder builder = MergeRegionsRequest.newBuilder();
+     RegionSpecifier regionASpecifier = RequestConverter.buildRegionSpecifier(
+         RegionSpecifierType.REGION_NAME, regionA);
+     RegionSpecifier regionBSpecifier = RequestConverter.buildRegionSpecifier(
+         RegionSpecifierType.REGION_NAME, regionB);
+     builder.setRegionA(regionASpecifier);
+     builder.setRegionB(regionBSpecifier);
+     builder.setForcible(forcible);
+     // send the master's wall clock time as well, so that the RS can refer to it
+     builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
+     return builder.build();
+   }
+
   /**
    * Get a ServerName from the passed in data bytes.
    * @param data Data with a serialize server name in it; can handle the old style

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 366e050..de2544a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -118,7 +118,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOr
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@@ -1114,19 +1113,6 @@ public final class RequestConverter {
     return builder.build();
   }
 
-  public static SplitTableRegionRequest buildSplitTableRegionRequest(
-      final HRegionInfo regionInfo,
-      final byte[] splitPoint,
-      final long nonceGroup,
-      final long nonce) {
-    SplitTableRegionRequest.Builder builder = SplitTableRegionRequest.newBuilder();
-    builder.setRegionInfo(HRegionInfo.convert(regionInfo));
-    builder.setSplitRow(UnsafeByteOperations.unsafeWrap(splitPoint));
-    builder.setNonceGroup(nonceGroup);
-    builder.setNonce(nonce);
-    return builder.build();
-  }
-
   /**
    * Create a protocol buffer AssignRegionRequest
    *
@@ -1509,7 +1495,7 @@ public final class RequestConverter {
   /**
    * Create a RegionOpenInfo based on given region info and version of offline node
    */
-  private static RegionOpenInfo buildRegionOpenInfo(
+  public static RegionOpenInfo buildRegionOpenInfo(
       final HRegionInfo region,
       final List<ServerName> favoredNodes, Boolean openForReplay) {
     RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index ecadbbc..c489628 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.SingleResponse;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
@@ -254,18 +253,6 @@ public final class ResponseConverter {
   }
 
   /**
-   * Check if the region is closed from a CloseRegionForSplitResponse
-   *
-   * @param proto the CloseRegionForSplitResponse
-   * @return the region close state
-   */
-  public static boolean isClosed
-      (final CloseRegionForSplitOrMergeResponse proto) {
-    if (proto == null || !proto.hasClosed()) return false;
-    return proto.getClosed();
-  }
-
-  /**
    * A utility to build a GetServerInfoResponse.
    *
    * @param serverName

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index afab54a..c11d896 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -439,6 +439,10 @@ public class MetaTableLocator {
    */
   public static void setMetaLocation(ZooKeeperWatcher zookeeper,
       ServerName serverName, int replicaId, RegionState.State state) throws KeeperException {
+    if (serverName == null) {
+      LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
+      return;
+    }
     LOG.info("Setting hbase:meta region location in ZooKeeper as " + serverName);
     // Make the MetaRegionServer pb and then get its bytes and save this as
     // the znode content.
@@ -448,7 +452,8 @@ public class MetaTableLocator {
       .setState(state.convert()).build();
     byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
     try {
-      ZKUtil.setData(zookeeper, zookeeper.znodePaths.getZNodeForReplica(replicaId), data);
+      ZKUtil.setData(zookeeper,
+          zookeeper.znodePaths.getZNodeForReplica(replicaId), data);
     } catch(KeeperException.NoNodeException nne) {
       if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
         LOG.debug("META region location doesn't exist, create it");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
index 6104c22..36dabdd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
@@ -80,12 +80,11 @@ public class ProcedureInfo implements Cloneable {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("Procedure=");
     sb.append(procName);
-    sb.append(" (id=");
+    sb.append(" pid=");
     sb.append(procId);
     if (hasParentId()) {
-      sb.append(", parent=");
+      sb.append(", ppid=");
       sb.append(parentId);
     }
     if (hasOwner()) {
@@ -107,7 +106,6 @@ public class ProcedureInfo implements Cloneable {
       sb.append(this.exception.getMessage());
       sb.append("\"");
     }
-    sb.append(")");
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
index fa7bbec..2ebf8c9 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
@@ -47,6 +47,7 @@ public interface MetricsAssignmentManagerSource extends BaseSource {
   String RIT_OLDEST_AGE_NAME = "ritOldestAge";
   String RIT_DURATION_NAME = "ritDuration";
   String ASSIGN_TIME_NAME = "assign";
+  String UNASSIGN_TIME_NAME = "unassign";
   String BULK_ASSIGN_TIME_NAME = "bulkAssign";
 
   String RIT_COUNT_DESC = "Current number of Regions In Transition (Gauge).";
@@ -56,9 +57,7 @@ public interface MetricsAssignmentManagerSource extends BaseSource {
   String RIT_DURATION_DESC =
       "Total durations in milliseconds for all Regions in Transition (Histogram).";
 
-  void updateAssignmentTime(long time);
-
-  void updateBulkAssignTime(long time);
+  String OPERATION_COUNT_NAME = "operationCount";
 
   /**
    * Set the number of regions in transition.
@@ -82,4 +81,19 @@ public interface MetricsAssignmentManagerSource extends BaseSource {
   void setRITOldestAge(long age);
 
   void updateRitDuration(long duration);
+
+  /**
+   * Increment the count of assignment operation (assign/unassign).
+   */
+  void incrementOperationCounter();
+
+  /**
+   * Add the time took to perform the last assign operation
+   */
+  void updateAssignTime(long time);
+
+  /**
+   * Add the time took to perform the last unassign operation
+   */
+  void updateUnassignTime(long time);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
index faae044..14b7e71 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
 import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 
 @InterfaceAudience.Private
@@ -32,8 +33,10 @@ public class MetricsAssignmentManagerSourceImpl
   private MutableGaugeLong ritCountOverThresholdGauge;
   private MutableGaugeLong ritOldestAgeGauge;
   private MetricHistogram ritDurationHisto;
+
+  private MutableFastCounter operationCounter;
   private MetricHistogram assignTimeHisto;
-  private MetricHistogram bulkAssignTimeHisto;
+  private MetricHistogram unassignTimeHisto;
 
   public MetricsAssignmentManagerSourceImpl() {
     this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
@@ -51,30 +54,39 @@ public class MetricsAssignmentManagerSourceImpl
         RIT_COUNT_OVER_THRESHOLD_DESC,0l);
     ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, RIT_OLDEST_AGE_DESC, 0l);
     assignTimeHisto = metricsRegistry.newTimeHistogram(ASSIGN_TIME_NAME);
-    bulkAssignTimeHisto = metricsRegistry.newTimeHistogram(BULK_ASSIGN_TIME_NAME);
+    unassignTimeHisto = metricsRegistry.newTimeHistogram(UNASSIGN_TIME_NAME);
     ritDurationHisto = metricsRegistry.newTimeHistogram(RIT_DURATION_NAME, RIT_DURATION_DESC);
+    operationCounter = metricsRegistry.getCounter(OPERATION_COUNT_NAME, 0l);
   }
 
   @Override
-  public void updateAssignmentTime(long time) {
-    assignTimeHisto.add(time);
+  public void setRIT(final int ritCount) {
+    ritGauge.set(ritCount);
   }
 
   @Override
-  public void updateBulkAssignTime(long time) {
-    bulkAssignTimeHisto.add(time);
+  public void setRITCountOverThreshold(final int ritCount) {
+    ritCountOverThresholdGauge.set(ritCount);
   }
 
-  public void setRIT(int ritCount) {
-    ritGauge.set(ritCount);
+  @Override
+  public void setRITOldestAge(final long ritCount) {
+    ritOldestAgeGauge.set(ritCount);
   }
 
-  public void setRITCountOverThreshold(int ritCount) {
-    ritCountOverThresholdGauge.set(ritCount);
+  @Override
+  public void incrementOperationCounter() {
+    operationCounter.incr();
   }
 
-  public void setRITOldestAge(long ritCount) {
-    ritOldestAgeGauge.set(ritCount);
+  @Override
+  public void updateAssignTime(final long time) {
+    assignTimeHisto.add(time);
+  }
+
+  @Override
+  public void updateUnassignTime(final long time) {
+    unassignTimeHisto.add(time);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index fbb066c..64c3e53 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
   private static final Log LOG = LogFactory.getLog(AbstractProcedureScheduler.class);
-  private final ReentrantLock schedLock = new ReentrantLock();
-  private final Condition schedWaitCond = schedLock.newCondition();
+  private final ReentrantLock schedulerLock = new ReentrantLock();
+  private final Condition schedWaitCond = schedulerLock.newCondition();
   private boolean running = false;
 
   // TODO: metrics
@@ -88,14 +88,14 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
   }
 
   protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
-    schedLock.lock();
+    schedulerLock.lock();
     try {
       enqueue(procedure, addFront);
       if (notify) {
         schedWaitCond.signal();
       }
     } finally {
-      schedLock.unlock();
+      schedulerLock.unlock();
     }
   }
 
@@ -219,11 +219,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
 
   @Override
   public void suspendEvent(final ProcedureEvent event) {
-    final boolean isTraceEnabled = LOG.isTraceEnabled();
+    final boolean traceEnabled = LOG.isTraceEnabled();
     synchronized (event) {
       event.setReady(false);
-      if (isTraceEnabled) {
-        LOG.trace("Suspend event " + event);
+      if (traceEnabled) {
+        LOG.trace("Suspend " + event);
       }
     }
   }
@@ -235,18 +235,29 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
 
   @Override
   public void wakeEvents(final int count, final ProcedureEvent... events) {
-    final boolean isTraceEnabled = LOG.isTraceEnabled();
+    final boolean traceEnabled = LOG.isTraceEnabled();
     schedLock();
     try {
       int waitingCount = 0;
       for (int i = 0; i < count; ++i) {
         final ProcedureEvent event = events[i];
         synchronized (event) {
-          event.setReady(true);
-          if (isTraceEnabled) {
-            LOG.trace("Wake event " + event);
+          if (!event.isReady()) {
+            // Only set ready if we were not ready; i.e. suspended. Otherwise, we double-wake
+            // on this event and down in wakeWaitingProcedures, we double decrement this
+            // finish which messes up child procedure accounting.
+            event.setReady(true);
+            if (traceEnabled) {
+              LOG.trace("Unsuspend " + event);
+            }
+            waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures());
+          } else {
+            ProcedureDeque q = event.getSuspendedProcedures();
+            if (q != null && !q.isEmpty()) {
+              LOG.warn("Q is not empty! size=" + q.size() + "; PROCESSING...");
+              waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures());
+            }
           }
-          waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures());
         }
       }
       wakePollIfNeeded(waitingCount);
@@ -275,6 +286,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
   }
 
   protected void wakeProcedure(final Procedure procedure) {
+    if (LOG.isTraceEnabled()) LOG.trace("Wake " + procedure);
     push(procedure, /* addFront= */ true, /* notify= */false);
   }
 
@@ -282,11 +294,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
   //  Internal helpers
   // ==========================================================================
   protected void schedLock() {
-    schedLock.lock();
+    schedulerLock.lock();
   }
 
   protected void schedUnlock() {
-    schedLock.unlock();
+    schedulerLock.unlock();
   }
 
   protected void wakePollIfNeeded(final int waitingCount) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 591c0d0..0184d5d 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -25,6 +25,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -43,21 +45,24 @@ import com.google.common.annotations.VisibleForTesting;
  * execute() is called each time the procedure is executed.
  * it may be called multiple times in case of failure and restart, so the
  * code must be idempotent.
- * the return is a set of sub-procedures or null in case the procedure doesn't
+ *
+ * <p>The return is a set of sub-procedures or null in case the procedure doesn't
  * have sub-procedures. Once the sub-procedures are successfully completed
  * the execute() method is called again, you should think at it as a stack:
+ * <pre>
  *  -&gt; step 1
  *  ---&gt; step 2
  *  -&gt; step 1
- *
- * rollback() is called when the procedure or one of the sub-procedures is failed.
- * the rollback step is supposed to cleanup the resources created during the
- * execute() step. in case of failure and restart rollback() may be called
- * multiple times, so the code must be idempotent.
+ * </pre>
+ * <p>rollback() is called when the procedure or one of the sub-procedures
+ * has failed. the rollback step is supposed to cleanup the resources created
+ * during the execute() step. in case of failure and restart rollback() may be
+ * called multiple times, so again the code must be idempotent.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
+  private static final Log LOG = LogFactory.getLog(Procedure.class);
   public static final long NO_PROC_ID = -1;
   protected static final int NO_TIMEOUT = -1;
 
@@ -275,11 +280,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
   protected StringBuilder toStringSimpleSB() {
     final StringBuilder sb = new StringBuilder();
 
-    sb.append("procId=");
+    sb.append("pid=");
     sb.append(getProcId());
 
     if (hasParent()) {
-      sb.append(", parentProcId=");
+      sb.append(", ppid=");
       sb.append(getParentProcId());
     }
 
@@ -288,14 +293,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
       sb.append(getOwner());
     }
 
-    sb.append(", state=");
+    sb.append(", procState=");
     toStringState(sb);
 
     if (hasException()) {
       sb.append(", exception=" + getException());
     }
 
-    sb.append(", ");
+    sb.append("; ");
     toStringClassDetails(sb);
 
     return sb;
@@ -344,7 +349,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
    * @param builder the string builder to use to append the proc specific information
    */
   protected void toStringClassDetails(StringBuilder builder) {
-    builder.append(getClass().getName());
+    builder.append(getClass().getSimpleName());
   }
 
   // ==========================================================================
@@ -648,6 +653,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
   @InterfaceAudience.Private
   protected synchronized void setChildrenLatch(final int numChildren) {
     this.childrenLatch = numChildren;
+    if (LOG.isTraceEnabled()) {
+      LOG.info("CHILD LATCH INCREMENT SET " +
+          this.childrenLatch, new Throwable(this.toString()));
+    }
   }
 
   /**
@@ -657,15 +666,34 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
   protected synchronized void incChildrenLatch() {
     // TODO: can this be inferred from the stack? I think so...
     this.childrenLatch++;
+    if (LOG.isTraceEnabled()) {
+      LOG.info("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
+    }
   }
 
   /**
    * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
    */
   @InterfaceAudience.Private
-  protected synchronized boolean childrenCountDown() {
+  private synchronized boolean childrenCountDown() {
     assert childrenLatch > 0: this;
-    return --childrenLatch == 0;
+    boolean b = --childrenLatch == 0;
+    if (LOG.isTraceEnabled()) {
+      LOG.info("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
+    }
+    return b;
+  }
+
+  /**
+   * Try to set this procedure into RUNNABLE state.
+   * Succeeds if all subprocedures/children are done.
+   * @return True if we were able to move procedure to RUNNABLE state.
+   */
+  synchronized boolean tryRunnable() {
+    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
+    boolean b = getState() == ProcedureState.WAITING && childrenCountDown();
+    if (b) setState(ProcedureState.RUNNABLE);
+    return b;
   }
 
   @InterfaceAudience.Private
@@ -732,6 +760,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
 
   /**
    * Internal method called by the ProcedureExecutor that starts the user-level code execute().
+   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
+   * skip out without changing states or releasing any locks held.
    */
   @InterfaceAudience.Private
   protected Procedure[] doExecute(final TEnvironment env)

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
index 43cce3a..adb27a8 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
@@ -50,6 +50,6 @@ public class ProcedureEvent<T> {
   @Override
   public String toString() {
     return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() +
-        ", suspended procedures count=" + getSuspendedProcedures().size();
+        ", " + getSuspendedProcedures();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 1bb6118..b1db2dc 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -32,6 +32,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.DelayQueue;
@@ -113,9 +115,11 @@ public class ProcedureExecutor<TEnvironment> {
    * Internal cleaner that removes the completed procedure results after a TTL.
    * NOTE: This is a special case handled in timeoutLoop().
    *
-   * Since the client code looks more or less like:
+   * <p>Since the client code looks more or less like:
+   * <pre>
    *   procId = master.doOperation()
    *   while (master.getProcResult(procId) == ProcInProgress);
+   * </pre>
    * The master should not throw away the proc result as soon as the procedure is done
    * but should wait a result request from the client (see executor.removeResult(procId))
    * The client will call something like master.isProcDone() or master.getProcResult()
@@ -480,10 +484,10 @@ public class ProcedureExecutor<TEnvironment> {
     // We have numThreads executor + one timer thread used for timing out
     // procedures and triggering periodic procedures.
     this.corePoolSize = numThreads;
-    LOG.info("Starting executor worker threads=" + corePoolSize);
+    LOG.info("Starting ProcedureExecutor Worker threads (ProcExecWrkr)=" + corePoolSize);
 
     // Create the Thread Group for the executors
-    threadGroup = new ThreadGroup("ProcedureExecutor");
+    threadGroup = new ThreadGroup("ProcExecThrdGrp");
 
     // Create the timeout executor
     timeoutExecutor = new TimeoutExecutorThread(threadGroup);
@@ -1077,13 +1081,16 @@ public class ProcedureExecutor<TEnvironment> {
     final Long rootProcId = getRootProcedureId(proc);
     if (rootProcId == null) {
       // The 'proc' was ready to run but the root procedure was rolledback
+      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
       executeRollback(proc);
       return;
     }
 
     final RootProcedureState procStack = rollbackStack.get(rootProcId);
-    if (procStack == null) return;
-
+    if (procStack == null) {
+      LOG.warn("RootProcedureState is null for " + proc.getProcId());
+      return;
+    }
     do {
       // Try to acquire the execution
       if (!procStack.acquire(proc)) {
@@ -1125,16 +1132,21 @@ public class ProcedureExecutor<TEnvironment> {
 
       // Execute the procedure
       assert proc.getState() == ProcedureState.RUNNABLE : proc;
-      switch (acquireLock(proc)) {
+      // Note that lock is NOT about concurrency but rather about ensuring
+      // ownership of a procedure of an entity such as a region or table.
+      LockState lockState = acquireLock(proc);
+      switch (lockState) {
         case LOCK_ACQUIRED:
           execProcedure(procStack, proc);
           releaseLock(proc, false);
           break;
         case LOCK_YIELD_WAIT:
+          LOG.info(lockState + " " + proc);
           scheduler.yield(proc);
           break;
         case LOCK_EVENT_WAIT:
           // someone will wake us up when the lock is available
+          LOG.debug(lockState + " " + proc);
           break;
         default:
           throw new UnsupportedOperationException();
@@ -1150,10 +1162,7 @@ public class ProcedureExecutor<TEnvironment> {
       if (proc.isSuccess()) {
         // update metrics on finishing the procedure
         proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
-        }
+        LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
         // Finalize the procedure state
         if (proc.getProcId() == rootProcId) {
           procedureFinished(proc);
@@ -1178,7 +1187,7 @@ public class ProcedureExecutor<TEnvironment> {
 
   private void releaseLock(final Procedure proc, final boolean force) {
     final TEnvironment env = getEnvironment();
-    // for how the framework works, we know that we will always have the lock
+    // For how the framework works, we know that we will always have the lock
     // when we call releaseLock(), so we can avoid calling proc.hasLock()
     if (force || !proc.holdLock(env)) {
       proc.doReleaseLock(env);
@@ -1193,6 +1202,8 @@ public class ProcedureExecutor<TEnvironment> {
   private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) {
     final Procedure rootProc = procedures.get(rootProcId);
     RemoteProcedureException exception = rootProc.getException();
+    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
+    // rolling back because the subprocedure does. Clarify.
     if (exception == null) {
       exception = procStack.getException();
       rootProc.setFailure(exception);
@@ -1269,7 +1280,7 @@ public class ProcedureExecutor<TEnvironment> {
       return LockState.LOCK_YIELD_WAIT;
     } catch (Throwable e) {
       // Catch NullPointerExceptions or similar errors...
-      LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
+      LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e);
     }
 
     // allows to kill the executor before something is stored to the wal.
@@ -1305,29 +1316,54 @@ public class ProcedureExecutor<TEnvironment> {
   }
 
   /**
-   * Executes the specified procedure
-   *  - calls the doExecute() of the procedure
-   *  - if the procedure execution didn't fail (e.g. invalid user input)
-   *     - ...and returned subprocedures
-   *        - the subprocedures are initialized.
-   *        - the subprocedures are added to the store
-   *        - the subprocedures are added to the runnable queue
-   *        - the procedure is now in a WAITING state, waiting for the subprocedures to complete
-   *     - ...if there are no subprocedure
-   *        - the procedure completed successfully
-   *        - if there is a parent (WAITING)
-   *            - the parent state will be set to RUNNABLE
-   *  - in case of failure
-   *    - the store is updated with the new state
-   *    - the executor (caller of this method) will start the rollback of the procedure
+   * Executes <code>procedure</code>
+   * <ul>
+   *  <li>Calls the doExecute() of the procedure
+   *  <li>If the procedure execution didn't fail (i.e. valid user input)
+   *  <ul>
+   *    <li>...and returned subprocedures
+   *    <ul><li>The subprocedures are initialized.
+   *      <li>The subprocedures are added to the store
+   *      <li>The subprocedures are added to the runnable queue
+   *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
+   *    </ul>
+   *    </li>
+   *   <li>...if there are no subprocedure
+   *    <ul><li>the procedure completed successfully
+   *      <li>if there is a parent (WAITING)
+   *      <li>the parent state will be set to RUNNABLE
+   *    </ul>
+   *   </li>
+   *  </ul>
+   *  </li>
+   *  <li>In case of failure
+   *  <ul>
+   *    <li>The store is updated with the new state</li>
+   *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
+   *  </ul>
+   *  </li>
+   *  </ul>
    */
   private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
     Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
 
-    // Execute the procedure
+    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
+    // The exception is caught below and then we hurry to the exit without disturbing state. The
+    // idea is that the processing of this procedure will be unsuspended later by an external event
+    // such the report of a region open. TODO: Currently, its possible for two worker threads
+    // to be working on the same procedure concurrently (locking in procedures is NOT about
+    // concurrency but about tying an entity to a procedure; i.e. a region to a particular
+    // procedure instance). This can make for issues if both threads are changing state.
+    // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+    // in RegionTransitionProcedure#reportTransition for example of Procedure putting
+    // itself back on the scheduler making it possible for two threads running against
+    // the one Procedure. Might be ok if they are both doing different, idempotent sections.
     boolean suspended = false;
+
+    // Whether to 're-' -execute; run through the loop again.
     boolean reExecute = false;
-    Procedure[] subprocs = null;
+
+    Procedure<?>[] subprocs = null;
     do {
       reExecute = false;
       try {
@@ -1336,14 +1372,18 @@ public class ProcedureExecutor<TEnvironment> {
           subprocs = null;
         }
       } catch (ProcedureSuspendedException e) {
+        LOG.info("Suspended " + procedure);
         suspended = true;
       } catch (ProcedureYieldException e) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Yield " + procedure + ": " + e.getMessage());
+          LOG.trace("Yield " + procedure + ": " + e.getMessage(), e);
         }
         scheduler.yield(procedure);
         return;
       } catch (InterruptedException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e);
+        }
         handleInterruptedException(procedure, e);
         scheduler.yield(procedure);
         return;
@@ -1357,14 +1397,22 @@ public class ProcedureExecutor<TEnvironment> {
       if (!procedure.isFailed()) {
         if (subprocs != null) {
           if (subprocs.length == 1 && subprocs[0] == procedure) {
-            // quick-shortcut for a state machine like procedure
+            // Procedure returned itself.
+            // Quick-shortcut for a state machine like procedure
             subprocs = null;
             reExecute = true;
+            LOG.info("Short-circuit to rexecute for pid=" + procedure.getProcId());
           } else {
             // yield the current procedure, and make the subprocedure runnable
             subprocs = initializeChildren(procStack, procedure, subprocs);
+            LOG.info("Initialized subprocedures=" +
+                Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
+                collect(Collectors.toList()).toString());
           }
         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Added to timeoutExecutor " + procedure);
+          }
           timeoutExecutor.add(procedure);
         } else if (!suspended) {
           // No subtask, so we are done
@@ -1388,12 +1436,13 @@ public class ProcedureExecutor<TEnvironment> {
       // executor thread to stop. The statement following the method call below seems to check if
       // store is not running, to prevent scheduling children procedures, re-execution or yield
       // of this procedure. This may need more scrutiny and subsequent cleanup in future
-      // Commit the transaction
+      //
+      // Commit the transaction even if a suspend (state may have changed). Note this append
+      // can take a bunch of time to complete.
       updateStoreOnExec(procStack, procedure, subprocs);
 
       // if the store is not running we are aborting
       if (!store.isRunning()) return;
-
       // if the procedure is kind enough to pass the slot to someone else, yield
       if (procedure.isRunnable() && !suspended &&
           procedure.isYieldAfterExecutionStep(getEnvironment())) {
@@ -1403,14 +1452,14 @@ public class ProcedureExecutor<TEnvironment> {
 
       assert (reExecute && subprocs == null) || !reExecute;
     } while (reExecute);
-
     // Submit the new subprocedures
     if (subprocs != null && !procedure.isFailed()) {
       submitChildrenProcedures(subprocs);
     }
 
-    // if the procedure is complete and has a parent, count down the children latch
-    if (procedure.isFinished() && procedure.hasParent()) {
+    // if the procedure is complete and has a parent, count down the children latch.
+    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
+    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
       countDownChildren(procStack, procedure);
     }
   }
@@ -1469,17 +1518,16 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // If this procedure is the last child awake the parent procedure
-    final boolean traceEnabled = LOG.isTraceEnabled();
-    if (traceEnabled) {
-      LOG.trace(parent + " child is done: " + procedure);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Finished suprocedure " + procedure);
     }
-
-    if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
-      parent.setState(ProcedureState.RUNNABLE);
+    if (parent.tryRunnable()) {
+      // If we succeeded in making the parent runnable -- i.e. all of its
+      // children have completed, move parent to front of the queue.
       store.update(parent);
       scheduler.addFront(parent);
-      if (traceEnabled) {
-        LOG.trace(parent + " all the children finished their work, resume.");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Finished ALL subprocedures of " + parent + "; resume.");
       }
       return;
     }
@@ -1571,7 +1619,7 @@ public class ProcedureExecutor<TEnvironment> {
     private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
 
     public WorkerThread(final ThreadGroup group) {
-      super(group, "ProcExecWorker-" + workerId.incrementAndGet());
+      super(group, "ProcExecWrkr-" + workerId.incrementAndGet());
     }
 
     @Override
@@ -1583,24 +1631,38 @@ public class ProcedureExecutor<TEnvironment> {
     public void run() {
       final boolean traceEnabled = LOG.isTraceEnabled();
       long lastUpdate = EnvironmentEdgeManager.currentTime();
-      while (isRunning() && keepAlive(lastUpdate)) {
-        final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
-        if (procedure == null) continue;
-
-        store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
-        executionStartTime.set(EnvironmentEdgeManager.currentTime());
-        try {
-          if (traceEnabled) {
-            LOG.trace("Trying to start the execution of " + procedure);
+      try {
+        while (isRunning() && keepAlive(lastUpdate)) {
+          final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
+          if (procedure == null) continue;
+          int activeCount = activeExecutorCount.incrementAndGet();
+          int runningCount = store.setRunningProcedureCount(activeCount);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Run pid=" + procedure.getProcId() +
+                " current=" + runningCount + ", active=" + activeCount);
+          }
+          executionStartTime.set(EnvironmentEdgeManager.currentTime());
+          try {
+            executeProcedure(procedure);
+          } catch (AssertionError e) {
+            LOG.info("ASSERT pid=" + procedure.getProcId(), e);
+            throw e;
+          } finally {
+            activeCount = activeExecutorCount.decrementAndGet();
+            runningCount = store.setRunningProcedureCount(activeCount);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Done pid=" + procedure.getProcId() +
+                  " current=" + runningCount + ", active=" + activeCount);
+            }
+            lastUpdate = EnvironmentEdgeManager.currentTime();
+            executionStartTime.set(Long.MAX_VALUE);
           }
-          executeProcedure(procedure);
-        } finally {
-          store.setRunningProcedureCount(activeExecutorCount.decrementAndGet());
-          lastUpdate = EnvironmentEdgeManager.currentTime();
-          executionStartTime.set(Long.MAX_VALUE);
         }
+      } catch (Throwable t) {
+        LOG.warn("Worker terminating because....", t);
+      } finally {
+        LOG.debug("Worker terminated.");
       }
-      LOG.debug("Worker thread terminated " + this);
       workerThreads.remove(this);
     }
 
@@ -1617,14 +1679,15 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  // ==========================================================================
-  //  Timeout Thread
-  // ==========================================================================
+  /**
+   * Runs task on a period such as check for stuck workers.
+   * @see InlineChore
+   */
   private final class TimeoutExecutorThread extends StoppableThread {
     private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
 
     public TimeoutExecutorThread(final ThreadGroup group) {
-      super(group, "ProcedureTimeoutExecutor");
+      super(group, "ProcExecTimeout");
     }
 
     @Override
@@ -1634,7 +1697,7 @@ public class ProcedureExecutor<TEnvironment> {
 
     @Override
     public void run() {
-      final boolean isTraceEnabled = LOG.isTraceEnabled();
+      final boolean traceEnabled = LOG.isTraceEnabled();
       while (isRunning()) {
         final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
         if (task == null || task == DelayedUtil.DELAYED_POISON) {
@@ -1643,8 +1706,8 @@ public class ProcedureExecutor<TEnvironment> {
           continue;
         }
 
-        if (isTraceEnabled) {
-          LOG.trace("Trying to start the execution of " + task);
+        if (traceEnabled) {
+          LOG.trace("Executing " + task);
         }
 
         // execute the task
@@ -1665,6 +1728,8 @@ public class ProcedureExecutor<TEnvironment> {
 
     public void add(final Procedure procedure) {
       assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
+      LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +
+          ", timestamp=" + procedure.getTimeoutTimestamp());
       queue.add(new DelayedProcedure(procedure));
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
index bdced10..48bb7d1 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
  * Special procedure used as a chore.
- * instead of bringing the Chore class in (dependencies reason),
+ * Instead of bringing the Chore class in (dependencies reason),
  * we reuse the executor timeout thread for this special case.
  *
  * The assumption is that procedure is used as hook to dispatch other procedures

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
new file mode 100644
index 0000000..8d5ff3c
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.collect.ArrayListMultimap;
+
+/**
+ * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
+ * count threshold. Creates its own threadpool to run RPCs with timeout.
+ * <ul>
+ * <li>Each server queue has a dispatch buffer</li>
+ * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
+ * </ul>
+ * <p>Call {@link #start()} and then {@link #submitTask(Callable)}. When done,
+ * call {@link #stop()}.
+ */
+@InterfaceAudience.Private
+public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
+  private static final Log LOG = LogFactory.getLog(RemoteProcedureDispatcher.class);
+
+  public static final String THREAD_POOL_SIZE_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.threadpool.size";
+  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
+
+  public static final String DISPATCH_DELAY_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.delay.msec";
+  private static final int DEFAULT_DISPATCH_DELAY = 150;
+
+  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.max.queue.size";
+  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
+
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
+      new ConcurrentHashMap<TRemote, BufferNode>();
+
+  private final int operationDelay;
+  private final int queueMaxSize;
+  private final int corePoolSize;
+
+  private TimeoutExecutorThread timeoutExecutor;
+  private ThreadPoolExecutor threadPool;
+
+  protected RemoteProcedureDispatcher(Configuration conf) {
+    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
+    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
+    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
+  }
+
+  public boolean start() {
+    if (running.getAndSet(true)) {
+      LOG.warn("Already running");
+      return false;
+    }
+
+    LOG.info("Starting procedure remote dispatcher; threads=" + this.corePoolSize +
+      ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" + this.operationDelay);
+
+    // Create the timeout executor
+    timeoutExecutor = new TimeoutExecutorThread();
+    timeoutExecutor.start();
+
+    // Create the thread pool that will execute RPCs
+    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
+      Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
+          getUncaughtExceptionHandler()));
+    return true;
+  }
+
+  public boolean stop() {
+    if (!running.getAndSet(false)) {
+      return false;
+    }
+
+    LOG.info("Stopping procedure remote dispatcher");
+
+    // send stop signals
+    timeoutExecutor.sendStopSignal();
+    threadPool.shutdownNow();
+    return true;
+  }
+
+  public void join() {
+    assert !running.get() : "expected not running";
+
+    // wait the timeout executor
+    timeoutExecutor.awaitTermination();
+    timeoutExecutor = null;
+
+    // wait for the thread pool to terminate
+    threadPool.shutdownNow();
+    try {
+      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        LOG.warn("Waiting for thread-pool to terminate");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for thread-pool termination", e);
+    }
+  }
+
+  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return new UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(Thread t, Throwable e) {
+        LOG.warn("Failed to execute remote procedures " + t.getName(), e);
+      }
+    };
+  }
+
+  // ============================================================================================
+  //  Node Helpers
+  // ============================================================================================
+  /**
+   * Add a node that will be able to execute remote procedures
+   * @param key the node identifier
+   */
+  public void addNode(final TRemote key) {
+    assert key != null: "Tried to add a node with a null key";
+    final BufferNode newNode = new BufferNode(key);
+    nodeMap.putIfAbsent(key, newNode);
+  }
+
+  /**
+   * Add a remote rpc. Be sure to check result for successful add.
+   * @param key the node identifier
+   * @return True if we successfully added the operation.
+   */
+  public boolean addOperationToNode(final TRemote key, RemoteProcedure rp) {
+    assert key != null : "found null key for node";
+    BufferNode node = nodeMap.get(key);
+    if (node == null) {
+      return false;
+    }
+    node.add(rp);
+    // Check our node still in the map; could have been removed by #removeNode.
+    return nodeMap.contains(node);
+  }
+
+  /**
+   * Remove a remote node
+   * @param key the node identifier
+   */
+  public boolean removeNode(final TRemote key) {
+    final BufferNode node = nodeMap.remove(key);
+    if (node == null) return false;
+    node.abortOperationsInQueue();
+    return true;
+  }
+
+  // ============================================================================================
+  //  Task Helpers
+  // ============================================================================================
+  protected Future<Void> submitTask(Callable<Void> task) {
+    return threadPool.submit(task);
+  }
+
+  protected Future<Void> submitTask(Callable<Void> task, long delay, TimeUnit unit) {
+    final FutureTask<Void> futureTask = new FutureTask(task);
+    timeoutExecutor.add(new DelayedTask(futureTask, delay, unit));
+    return futureTask;
+  }
+
+  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
+  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
+
+  /**
+   * Data structure with reference to remote operation.
+   */
+  public static abstract class RemoteOperation {
+    private final RemoteProcedure remoteProcedure;
+
+    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
+      this.remoteProcedure = remoteProcedure;
+    }
+
+    public RemoteProcedure getRemoteProcedure() {
+      return remoteProcedure;
+    }
+  }
+
+  /**
+   * Remote procedure reference.
+   * @param <TEnv>
+   * @param <TRemote>
+   */
+  public interface RemoteProcedure<TEnv, TRemote> {
+    RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
+    void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response);
+    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
+  }
+
+  /**
+   * Account of what procedures are running on remote node.
+   * @param <TEnv>
+   * @param <TRemote>
+   */
+  public interface RemoteNode<TEnv, TRemote> {
+    TRemote getKey();
+    void add(RemoteProcedure<TEnv, TRemote> operation);
+    void dispatch();
+  }
+
+  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
+      final TRemote remote, final Set<RemoteProcedure> operations) {
+    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
+    for (RemoteProcedure proc: operations) {
+      RemoteOperation operation = proc.remoteCallBuild(env, remote);
+      requestByType.put(operation.getClass(), operation);
+    }
+    return requestByType;
+  }
+
+  protected <T extends RemoteOperation> List<T> fetchType(
+      final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
+    return (List<T>)requestByType.removeAll(type);
+  }
+
+  // ============================================================================================
+  //  Timeout Helpers
+  // ============================================================================================
+  private final class TimeoutExecutorThread extends Thread {
+    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
+
+    public TimeoutExecutorThread() {
+      super("ProcedureDispatcherTimeoutThread");
+    }
+
+    @Override
+    public void run() {
+      while (running.get()) {
+        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
+        if (task == null || task == DelayedUtil.DELAYED_POISON) {
+          // the executor may be shutting down, and the task is just the shutdown request
+          continue;
+        }
+        if (task instanceof DelayedTask) {
+          threadPool.execute(((DelayedTask)task).getObject());
+        } else {
+          ((BufferNode)task).dispatch();
+        }
+      }
+    }
+
+    public void add(final DelayedWithTimeout delayed) {
+      queue.add(delayed);
+    }
+
+    public void remove(final DelayedWithTimeout delayed) {
+      queue.remove(delayed);
+    }
+
+    public void sendStopSignal() {
+      queue.add(DelayedUtil.DELAYED_POISON);
+    }
+
+    public void awaitTermination() {
+      try {
+        final long startTime = EnvironmentEdgeManager.currentTime();
+        for (int i = 0; isAlive(); ++i) {
+          sendStopSignal();
+          join(250);
+          if (i > 0 && (i % 8) == 0) {
+            LOG.warn("Waiting termination of thread " + getName() + ", " +
+              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.warn(getName() + " join wait got interrupted", e);
+      }
+    }
+  }
+
+  // ============================================================================================
+  //  Internals Helpers
+  // ============================================================================================
+
+  /**
+   * Node that contains a set of RemoteProcedures
+   */
+  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
+      implements RemoteNode<TEnv, TRemote> {
+    private Set<RemoteProcedure> operations;
+
+    protected BufferNode(final TRemote key) {
+      super(key, 0);
+    }
+
+    public TRemote getKey() {
+      return getObject();
+    }
+
+    public synchronized void add(final RemoteProcedure operation) {
+      if (this.operations == null) {
+        this.operations = new HashSet<>();
+        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
+        timeoutExecutor.add(this);
+      }
+      this.operations.add(operation);
+      if (this.operations.size() > queueMaxSize) {
+        timeoutExecutor.remove(this);
+        dispatch();
+      }
+    }
+
+    public synchronized void dispatch() {
+      if (operations != null) {
+        remoteDispatch(getKey(), operations);
+        this.operations = null;
+      }
+    }
+
+    public synchronized void abortOperationsInQueue() {
+      if (operations != null) {
+        abortPendingOperations(getKey(), operations);
+        this.operations = null;
+      }
+    }
+
+    @Override
+    public String toString() {
+      return super.toString() + ", operations=" + this.operations;
+    }
+  }
+
+  /**
+   * Delayed object that holds a FutureTask.
+   * used to submit something later to the thread-pool.
+   */
+  private static final class DelayedTask extends DelayedContainerWithTimestamp<FutureTask<Void>> {
+    public DelayedTask(final FutureTask<Void> task, final long delay, final TimeUnit unit) {
+      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
index 1a84070..64bb278 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
@@ -27,12 +27,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData;
 
 /**
- * A SequentialProcedure describes one step in a procedure chain.
+ * A SequentialProcedure describes one step in a procedure chain:
+ * <pre>
  *   -&gt; Step 1 -&gt; Step 2 -&gt; Step 3
- *
+ * </pre>
  * The main difference from a base Procedure is that the execute() of a
- * SequentialProcedure will be called only once, there will be no second
- * execute() call once the child are finished. which means once the child
+ * SequentialProcedure will be called only once; there will be no second
+ * execute() call once the children are finished. which means once the child
  * of a SequentialProcedure are completed the SequentialProcedure is completed too.
  */
 @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index ea2a41f..0008c16 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,7 +57,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
   private int stateCount = 0;
   private int[] states = null;
 
-  private ArrayList<Procedure> subProcList = null;
+  private List<Procedure<?>> subProcList = null;
 
   protected enum Flow {
     HAS_MORE_STATE,
@@ -131,12 +132,15 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
    * Add a child procedure to execute
    * @param subProcedure the child procedure
    */
-  protected void addChildProcedure(Procedure... subProcedure) {
+  protected void addChildProcedure(Procedure<?>... subProcedure) {
+    if (subProcedure == null) return;
+    final int len = subProcedure.length;
+    if (len == 0) return;
     if (subProcList == null) {
-      subProcList = new ArrayList<>(subProcedure.length);
+      subProcList = new ArrayList<>(len);
     }
-    for (int i = 0; i < subProcedure.length; ++i) {
-      Procedure proc = subProcedure[i];
+    for (int i = 0; i < len; ++i) {
+      Procedure<?> proc = subProcedure[i];
       if (!proc.hasOwner()) proc.setOwner(getOwner());
       subProcList.add(proc);
     }
@@ -148,21 +152,17 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
     updateTimestamp();
     try {
       if (!hasMoreState() || isFailed()) return null;
-
       TState state = getCurrentState();
       if (stateCount == 0) {
         setNextState(getStateId(state));
       }
-
       stateFlow = executeFromState(env, state);
       if (!hasMoreState()) setNextState(EOF_STATE);
-
-      if (subProcList != null && subProcList.size() != 0) {
+      if (subProcList != null && !subProcList.isEmpty()) {
         Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
         subProcList = null;
         return subProcedures;
       }
-
       return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
     } finally {
       updateTimestamp();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index c03e326..9e53f42 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -52,8 +52,8 @@ public class NoopProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
-  public void setRunningProcedureCount(final int count) {
-    // no-op
+  public int setRunningProcedureCount(final int count) {
+    return count;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 385cedb..a690c81 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -153,8 +153,9 @@ public interface ProcedureStore {
   /**
    * Set the number of procedure running.
    * This can be used, for example, by the store to know how long to wait before a sync.
+   * @return how many procedures are running (may not be same as <code>count</code>).
    */
-  void setRunningProcedureCount(int count);
+  int setRunningProcedureCount(int count);
 
   /**
    * Acquire the lease for the procedure store.


Mime
View raw message