hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [17/27] hbase git commit: Revert "HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)" Revert a mistaken commit!!!
Date Thu, 25 May 2017 06:32:21 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java
new file mode 100644
index 0000000..929cd4e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java
@@ -0,0 +1,122 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Base class used bulk assigning and unassigning regions.
+ * Encapsulates a fixed size thread pool of executors to run assignment/unassignment.
+ * Implement {@link #populatePool(java.util.concurrent.ExecutorService)} and
+ * {@link #waitUntilDone(long)}.  The default implementation of
+ * the {@link #getUncaughtExceptionHandler()} is to abort the hosting
+ * Server.
+ */
+@InterfaceAudience.Private
+public abstract class BulkAssigner {
+  protected final Server server;
+
+  /**
+   * @param server An instance of Server
+   */
+  public BulkAssigner(final Server server) {
+    this.server = server;
+  }
+
+  /**
+   * @return What to use for a thread prefix when executor runs.
+   */
+  protected String getThreadNamePrefix() {
+    return this.server.getServerName() + "-" + this.getClass().getName(); 
+  }
+
+  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return new UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(Thread t, Throwable e) {
+        // Abort if exception of any kind.
+        server.abort("Uncaught exception in " + t.getName(), e);
+      }
+    };
+  }
+
+  protected int getThreadCount() {
+    return this.server.getConfiguration().
+      getInt("hbase.bulk.assignment.threadpool.size", 20);
+  }
+
+  protected long getTimeoutOnRIT() {
+    return this.server.getConfiguration().
+      getLong("hbase.bulk.assignment.waiton.empty.rit", 5 * 60 * 1000);
+  }
+
+  protected abstract void populatePool(
+      final java.util.concurrent.ExecutorService pool) throws IOException;
+
+  public boolean bulkAssign() throws InterruptedException, IOException {
+    return bulkAssign(true);
+  }
+
+  /**
+   * Run the bulk assign.
+   * 
+   * @param sync
+   *          Whether to assign synchronously.
+   * @throws InterruptedException
+   * @return True if done.
+   * @throws IOException
+   */
+  public boolean bulkAssign(boolean sync) throws InterruptedException,
+      IOException {
+    boolean result = false;
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setDaemon(true);
+    builder.setNameFormat(getThreadNamePrefix() + "-%1$d");
+    builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
+    int threadCount = getThreadCount();
+    java.util.concurrent.ExecutorService pool =
+      Executors.newFixedThreadPool(threadCount, builder.build());
+    try {
+      populatePool(pool);
+      // How long to wait on empty regions-in-transition.  If we timeout, the
+      // RIT monitor should do fixup.
+      if (sync) result = waitUntilDone(getTimeoutOnRIT());
+    } finally {
+      // We're done with the pool.  It'll exit when its done all in queue.
+      pool.shutdown();
+    }
+    return result;
+  }
+
+  /**
+   * Wait until bulk assign is done.
+   * @param timeout How long to wait.
+   * @throws InterruptedException
+   * @return True if the condition we were waiting on happened.
+   */
+  protected abstract boolean waitUntilDone(final long timeout)
+  throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
new file mode 100644
index 0000000..d8c511e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
@@ -0,0 +1,136 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Performs bulk reopen of the list of regions provided to it.
+ */
+@InterfaceAudience.Private
+public class BulkReOpen extends BulkAssigner {
+  private final Map<ServerName, List<HRegionInfo>> rsToRegions;
+  private final AssignmentManager assignmentManager;
+  private static final Log LOG = LogFactory.getLog(BulkReOpen.class);
+
+  public BulkReOpen(final Server server,
+      final Map<ServerName, List<HRegionInfo>> serverToRegions,
+    final AssignmentManager am) {
+    super(server);
+    this.assignmentManager = am;
+    this.rsToRegions = serverToRegions;
+  }
+
+  /**
+   * Unassign all regions, so that they go through the regular region
+   * assignment flow (in assignment manager) and are re-opened.
+   */
+  @Override
+  protected void populatePool(ExecutorService pool) {
+    LOG.debug("Creating threads for each region server ");
+    for (Map.Entry<ServerName, List<HRegionInfo>> e : rsToRegions
+        .entrySet()) {
+      final List<HRegionInfo> hris = e.getValue();
+      // add plans for the regions that need to be reopened
+      Map<String, RegionPlan> plans = new HashMap<>();
+      for (HRegionInfo hri : hris) {
+        RegionPlan reOpenPlan = assignmentManager.getRegionReopenPlan(hri);
+        plans.put(hri.getEncodedName(), reOpenPlan);
+      }
+      assignmentManager.addPlans(plans);
+      pool.execute(new Runnable() {
+        public void run() {
+          try {
+            unassign(hris);
+          } catch (Throwable t) {
+            LOG.warn("Failed bulking re-open " + hris.size()
+              + " region(s)", t);
+          }
+        }
+      });
+    }
+  }
+
+ /**
+  * Reopen the regions asynchronously, so always returns true immediately.
+  * @return true
+  */
+  @Override
+  protected boolean waitUntilDone(long timeout) {
+    return true;
+  }
+
+  /**
+   * Configuration knobs "hbase.bulk.reopen.threadpool.size" number of regions
+   * that can be reopened concurrently. The maximum number of threads the master
+   * creates is never more than the number of region servers.
+   * If configuration is not defined it defaults to 20
+   */
+  protected int getThreadCount() {
+    int defaultThreadCount = super.getThreadCount();
+    return this.server.getConfiguration().getInt(
+        "hbase.bulk.reopen.threadpool.size", defaultThreadCount);
+  }
+
+  public boolean bulkReOpen() throws InterruptedException, IOException {
+    return bulkAssign();
+  }
+
+  /**
+   * Unassign the list of regions. Configuration knobs:
+   * hbase.bulk.waitbetween.reopen indicates the number of milliseconds to
+   * wait before unassigning another region from this region server
+   *
+   * @param regions
+   * @throws InterruptedException
+   */
+  private void unassign(
+      List<HRegionInfo> regions) throws InterruptedException {
+    int waitTime = this.server.getConfiguration().getInt(
+        "hbase.bulk.waitbetween.reopen", 0);
+    RegionStates regionStates = assignmentManager.getRegionStates();
+    for (HRegionInfo region : regions) {
+      if (server.isStopped()) {
+        return;
+      }
+      if (regionStates.isRegionInTransition(region)) {
+        continue;
+      }
+      assignmentManager.unassign(region);
+      while (regionStates.isRegionInTransition(region)
+          && !server.isStopped()) {
+        regionStates.waitForUpdate(100);
+      }
+      if (waitTime > 0 && !server.isStopped()) {
+        Thread.sleep(waitTime);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
index 4775a0a..affd44c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,15 +39,11 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
-import org.apache.hadoop.hbase.master.assignment.GCMergedRegionsProcedure;
-import org.apache.hadoop.hbase.master.assignment.GCRegionProcedure;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -55,8 +52,6 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Triple;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * A janitor for the catalog tables.  Scans the <code>hbase:meta</code> catalog
  * table on a period looking for unused regions to garbage collect.
@@ -69,7 +64,6 @@ public class CatalogJanitor extends ScheduledChore {
   private final AtomicBoolean enabled = new AtomicBoolean(true);
   private final MasterServices services;
   private final Connection connection;
-  // PID of the last Procedure launched herein. Keep around for Tests.
 
   CatalogJanitor(final MasterServices services) {
     super("CatalogJanitor-" + services.getServerName().toShortString(), services,
@@ -118,13 +112,10 @@ public class CatalogJanitor extends ScheduledChore {
           && !this.services.isInMaintenanceMode()
           && am != null
           && am.isFailoverCleanupDone()
-          && !am.hasRegionsInTransition()) {
+          && am.getRegionStates().getRegionsInTransition().isEmpty()) {
         scan();
       } else {
-        LOG.warn("CatalogJanitor is disabled! Enabled=" + this.enabled.get() +
-            ", maintenanceMode=" + this.services.isInMaintenanceMode() +
-            ", am=" + am + ", failoverCleanupDone=" + (am != null && am.isFailoverCleanupDone()) +
-            ", hasRIT=" + (am != null && am.hasRegionsInTransition()));
+        LOG.warn("CatalogJanitor disabled! Not running scan.");
       }
     } catch (IOException e) {
       LOG.warn("Failed scan of catalog table", e);
@@ -176,7 +167,6 @@ public class CatalogJanitor extends ScheduledChore {
           // Another table, stop scanning
           return false;
         }
-        if (LOG.isTraceEnabled()) LOG.trace("" + info + " IS-SPLIT_PARENT=" + info.isSplitParent());
         if (info.isSplitParent()) splitParents.put(info, r);
         if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) {
           mergedRegions.put(info, r);
@@ -197,6 +187,8 @@ public class CatalogJanitor extends ScheduledChore {
    * If merged region no longer holds reference to the merge regions, archive
    * merge region on hdfs and perform deleting references in hbase:meta
    * @param mergedRegion
+   * @param regionA
+   * @param regionB
    * @return true if we delete references in merged region on hbase:meta and archive
    *         the files on the file system
    * @throws IOException
@@ -215,12 +207,18 @@ public class CatalogJanitor extends ScheduledChore {
       LOG.warn("Merged region does not exist: " + mergedRegion.getEncodedName());
     }
     if (regionFs == null || !regionFs.hasReferences(htd)) {
-      LOG.debug("Deleting region " + regionA.getShortNameToLog() + " and "
-          + regionB.getShortNameToLog()
+      LOG.debug("Deleting region " + regionA.getRegionNameAsString() + " and "
+          + regionB.getRegionNameAsString()
           + " from fs because merged region no longer holds references");
-      ProcedureExecutor<MasterProcedureEnv> pe = this.services.getMasterProcedureExecutor();
-      pe.submitProcedure(new GCMergedRegionsProcedure(pe.getEnvironment(),
-          mergedRegion, regionA, regionB));
+      HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionA);
+      HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionB);
+      MetaTableAccessor.deleteMergeQualifiers(services.getConnection(), mergedRegion);
+      services.getServerManager().removeRegion(regionA);
+      services.getServerManager().removeRegion(regionB);
+      FavoredNodesManager fnm = this.services.getFavoredNodesManager();
+      if (fnm != null) {
+        fnm.deleteFavoredNodesForRegions(Lists.newArrayList(regionA, regionB));
+      }
       return true;
     }
     return false;
@@ -229,21 +227,22 @@ public class CatalogJanitor extends ScheduledChore {
   /**
    * Run janitorial scan of catalog <code>hbase:meta</code> table looking for
    * garbage to collect.
-   * @return number of archiving jobs started.
+   * @return number of cleaned regions
    * @throws IOException
    */
   int scan() throws IOException {
-    int result = 0;
     try {
       if (!alreadyRunning.compareAndSet(false, true)) {
         LOG.debug("CatalogJanitor already running");
-        return result;
+        return 0;
       }
       Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>> scanTriple =
         getMergedRegionsAndSplitParents();
+      int count = scanTriple.getFirst();
       /**
        * clean merge regions first
        */
+      int mergeCleaned = 0;
       Map<HRegionInfo, Result> mergedRegions = scanTriple.getSecond();
       for (Map.Entry<HRegionInfo, Result> e : mergedRegions.entrySet()) {
         if (this.services.isInMaintenanceMode()) {
@@ -256,13 +255,13 @@ public class CatalogJanitor extends ScheduledChore {
         HRegionInfo regionB = p.getSecond();
         if (regionA == null || regionB == null) {
           LOG.warn("Unexpected references regionA="
-              + (regionA == null ? "null" : regionA.getShortNameToLog())
+              + (regionA == null ? "null" : regionA.getRegionNameAsString())
               + ",regionB="
-              + (regionB == null ? "null" : regionB.getShortNameToLog())
-              + " in merged region " + e.getKey().getShortNameToLog());
+              + (regionB == null ? "null" : regionB.getRegionNameAsString())
+              + " in merged region " + e.getKey().getRegionNameAsString());
         } else {
           if (cleanMergeRegion(e.getKey(), regionA, regionB)) {
-            result++;
+            mergeCleaned++;
           }
         }
       }
@@ -272,6 +271,7 @@ public class CatalogJanitor extends ScheduledChore {
       Map<HRegionInfo, Result> splitParents = scanTriple.getThird();
 
       // Now work on our list of found parents. See if any we can clean up.
+      int splitCleaned = 0;
       // regions whose parents are still around
       HashSet<String> parentNotCleaned = new HashSet<>();
       for (Map.Entry<HRegionInfo, Result> e : splitParents.entrySet()) {
@@ -281,8 +281,8 @@ public class CatalogJanitor extends ScheduledChore {
         }
 
         if (!parentNotCleaned.contains(e.getKey().getEncodedName()) &&
-              cleanParent(e.getKey(), e.getValue())) {
-            result++;
+            cleanParent(e.getKey(), e.getValue())) {
+          splitCleaned++;
         } else {
           // We could not clean the parent, so it's daughters should not be
           // cleaned either (HBASE-6160)
@@ -292,7 +292,16 @@ public class CatalogJanitor extends ScheduledChore {
           parentNotCleaned.add(daughters.getSecond().getEncodedName());
         }
       }
-      return result;
+      if ((mergeCleaned + splitCleaned) != 0) {
+        LOG.info("Scanned " + count + " catalog row(s), gc'd " + mergeCleaned
+            + " unreferenced merged region(s) and " + splitCleaned
+            + " unreferenced parent region(s)");
+      } else if (LOG.isTraceEnabled()) {
+        LOG.trace("Scanned " + count + " catalog row(s), gc'd " + mergeCleaned
+            + " unreferenced merged region(s) and " + splitCleaned
+            + " unreferenced parent region(s)");
+      }
+      return mergeCleaned + splitCleaned;
     } finally {
       alreadyRunning.set(false);
     }
@@ -334,30 +343,34 @@ public class CatalogJanitor extends ScheduledChore {
    */
   boolean cleanParent(final HRegionInfo parent, Result rowContent)
   throws IOException {
+    boolean result = false;
     // Check whether it is a merged region and not clean reference
     // No necessary to check MERGEB_QUALIFIER because these two qualifiers will
     // be inserted/deleted together
-    if (rowContent.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) {
+    if (rowContent.getValue(HConstants.CATALOG_FAMILY,
+        HConstants.MERGEA_QUALIFIER) != null) {
       // wait cleaning merge region first
-      return false;
+      return result;
     }
     // Run checks on each daughter split.
     PairOfSameType<HRegionInfo> daughters = MetaTableAccessor.getDaughterRegions(rowContent);
     Pair<Boolean, Boolean> a = checkDaughterInFs(parent, daughters.getFirst());
     Pair<Boolean, Boolean> b = checkDaughterInFs(parent, daughters.getSecond());
     if (hasNoReferences(a) && hasNoReferences(b)) {
-      String daughterA = daughters.getFirst() != null?
-          daughters.getFirst().getShortNameToLog(): "null";
-      String daughterB = daughters.getSecond() != null?
-          daughters.getSecond().getShortNameToLog(): "null";
-      LOG.debug("Deleting region " + parent.getShortNameToLog() +
-        " because daughters -- " + daughterA + ", " + daughterB +
-        " -- no longer hold references");
-      ProcedureExecutor<MasterProcedureEnv> pe = this.services.getMasterProcedureExecutor();
-      pe.submitProcedure(new GCRegionProcedure(pe.getEnvironment(), parent));
-      return true;
+      LOG.debug("Deleting region " + parent.getRegionNameAsString() +
+        " because daughter splits no longer hold references");
+      FileSystem fs = this.services.getMasterFileSystem().getFileSystem();
+      if (LOG.isTraceEnabled()) LOG.trace("Archiving parent region: " + parent);
+      HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, parent);
+      MetaTableAccessor.deleteRegion(this.connection, parent);
+      services.getServerManager().removeRegion(parent);
+      FavoredNodesManager fnm = this.services.getFavoredNodesManager();
+      if (fnm != null) {
+        fnm.deleteFavoredNodesForRegions(Lists.newArrayList(parent));
+      }
+      result = true;
     }
-    return false;
+    return result;
   }
 
   /**
@@ -456,4 +469,4 @@ public class CatalogJanitor extends ScheduledChore {
     return cleanMergeRegion(region, mergeRegions.getFirst(),
         mergeRegions.getSecond());
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
index 34a7633..faceba2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
@@ -61,7 +61,7 @@ public class DeadServer {
   /**
    * Whether a dead server is being processed currently.
    */
-  private volatile boolean processing = false;
+  private boolean processing = false;
 
   /**
    * A dead server that comes back alive has a different start code. The new start code should be
@@ -123,14 +123,14 @@ public class DeadServer {
    * @param sn ServerName for the dead server.
    */
   public synchronized void notifyServer(ServerName sn) {
-    if (LOG.isTraceEnabled()) { LOG.trace("Started processing " + sn); }
+    if (LOG.isDebugEnabled()) { LOG.debug("Started processing " + sn); }
     processing = true;
     numProcessing++;
   }
 
   public synchronized void finish(ServerName sn) {
     numProcessing--;
-    if (LOG.isTraceEnabled()) LOG.trace("Finished " + sn + "; numProcessing=" + numProcessing);
+    if (LOG.isDebugEnabled()) LOG.debug("Finished " + sn + "; numProcessing=" + numProcessing);
 
     assert numProcessing >= 0: "Number of dead servers in processing should always be non-negative";
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
new file mode 100644
index 0000000..fc3607f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Run bulk assign.  Does one RCP per regionserver passing a
+ * batch of regions using {@link GeneralBulkAssigner.SingleServerBulkAssigner}.
+ */
+@InterfaceAudience.Private
+public class GeneralBulkAssigner extends BulkAssigner {
+  private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class);
+
+  private Map<ServerName, List<HRegionInfo>> failedPlans = new ConcurrentHashMap<>();
+  private ExecutorService pool;
+
+  final Map<ServerName, List<HRegionInfo>> bulkPlan;
+  final AssignmentManager assignmentManager;
+  final boolean waitTillAllAssigned;
+
+  public GeneralBulkAssigner(final Server server,
+      final Map<ServerName, List<HRegionInfo>> bulkPlan,
+      final AssignmentManager am, final boolean waitTillAllAssigned) {
+    super(server);
+    this.bulkPlan = bulkPlan;
+    this.assignmentManager = am;
+    this.waitTillAllAssigned = waitTillAllAssigned;
+  }
+
+  @Override
+  protected String getThreadNamePrefix() {
+    return this.server.getServerName() + "-GeneralBulkAssigner";
+  }
+
+  @Override
+  protected void populatePool(ExecutorService pool) {
+    this.pool = pool; // shut it down later in case some assigner hangs
+    for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
+      pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
+        this.assignmentManager, this.failedPlans));
+    }
+  }
+
+  /**
+   *
+   * @param timeout How long to wait.
+   * @return true if done.
+   */
+  @Override
+  protected boolean waitUntilDone(final long timeout)
+  throws InterruptedException {
+    Set<HRegionInfo> regionSet = new HashSet<>();
+    for (List<HRegionInfo> regionList : bulkPlan.values()) {
+      regionSet.addAll(regionList);
+    }
+
+    pool.shutdown(); // no more task allowed
+    int serverCount = bulkPlan.size();
+    int regionCount = regionSet.size();
+    long startTime = System.currentTimeMillis();
+    long rpcWaitTime = startTime + timeout;
+    while (!server.isStopped() && !pool.isTerminated()
+        && rpcWaitTime > System.currentTimeMillis()) {
+      if (failedPlans.isEmpty()) {
+        pool.awaitTermination(100, TimeUnit.MILLISECONDS);
+      } else {
+        reassignFailedPlans();
+      }
+    }
+    if (!pool.isTerminated()) {
+      LOG.warn("bulk assigner is still running after "
+        + (System.currentTimeMillis() - startTime) + "ms, shut it down now");
+      // some assigner hangs, can't wait any more, shutdown the pool now
+      List<Runnable> notStarted = pool.shutdownNow();
+      if (notStarted != null && !notStarted.isEmpty()) {
+        server.abort("some single server assigner hasn't started yet"
+          + " when the bulk assigner timed out", null);
+        return false;
+      }
+    }
+
+    int reassigningRegions = 0;
+    if (!failedPlans.isEmpty() && !server.isStopped()) {
+      reassigningRegions = reassignFailedPlans();
+    }
+    assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
+      reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
+
+    if (LOG.isDebugEnabled()) {
+      long elapsedTime = System.currentTimeMillis() - startTime;
+      String status = "successfully";
+      if (!regionSet.isEmpty()) {
+        status = "with " + regionSet.size() + " regions still in transition";
+      }
+      LOG.debug("bulk assigning total " + regionCount + " regions to "
+        + serverCount + " servers, took " + elapsedTime + "ms, " + status);
+    }
+    return regionSet.isEmpty();
+  }
+
+  @Override
+  protected long getTimeoutOnRIT() {
+    // Guess timeout.  Multiply the max number of regions on a server
+    // by how long we think one region takes opening.
+    Configuration conf = server.getConfiguration();
+    long perRegionOpenTimeGuesstimate =
+      conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
+    int maxRegionsPerServer = 1;
+    for (List<HRegionInfo> regionList : bulkPlan.values()) {
+      int size = regionList.size();
+      if (size > maxRegionsPerServer) {
+        maxRegionsPerServer = size;
+      }
+    }
+    long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer
+      + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000)
+      + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime",
+        30000) * bulkPlan.size();
+    LOG.debug("Timeout-on-RIT=" + timeout);
+    return timeout;
+  }
+
+  @Override
+  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return new UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(Thread t, Throwable e) {
+        LOG.warn("Assigning regions in " + t.getName(), e);
+      }
+    };
+  }
+
+  private int reassignFailedPlans() {
+    List<HRegionInfo> reassigningRegions = new ArrayList<>();
+    for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
+      LOG.info("Failed assigning " + e.getValue().size()
+          + " regions to server " + e.getKey() + ", reassigning them");
+      reassigningRegions.addAll(failedPlans.remove(e.getKey()));
+    }
+    RegionStates regionStates = assignmentManager.getRegionStates();
+    for (HRegionInfo region : reassigningRegions) {
+      if (!regionStates.isRegionOnline(region)) {
+        assignmentManager.invokeAssign(region);
+      }
+    }
+    return reassigningRegions.size();
+  }
+
+  /**
+   * Manage bulk assigning to a server.
+   */
+  static class SingleServerBulkAssigner implements Runnable {
+    private final ServerName regionserver;
+    private final List<HRegionInfo> regions;
+    private final AssignmentManager assignmentManager;
+    private final Map<ServerName, List<HRegionInfo>> failedPlans;
+
+    SingleServerBulkAssigner(final ServerName regionserver,
+        final List<HRegionInfo> regions, final AssignmentManager am,
+        final Map<ServerName, List<HRegionInfo>> failedPlans) {
+      this.regionserver = regionserver;
+      this.regions = regions;
+      this.assignmentManager = am;
+      this.failedPlans = failedPlans;
+    }
+
+    @Override
+    public void run() {
+      try {
+       if (!assignmentManager.assign(regionserver, regions)) {
+         failedPlans.put(regionserver, regions);
+       }
+      } catch (Throwable t) {
+        LOG.warn("Failed bulking assigning " + regions.size()
+            + " region(s) to " + regionserver.getServerName()
+            + ", and continue to bulk assign others", t);
+        failedPlans.put(regionserver, regions);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 83f5a1c..4dd6353 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -36,8 +36,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
@@ -91,10 +90,6 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
-import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
-import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
-import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
 import org.apache.hadoop.hbase.master.balancer.BalancerChore;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
@@ -115,15 +110,16 @@ import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
-import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure;
 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.master.procedure.MergeTableRegionsProcedure;
 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
+import org.apache.hadoop.hbase.master.procedure.SplitTableRegionProcedure;
 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
 import org.apache.hadoop.hbase.master.replication.ReplicationManager;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -346,6 +342,7 @@ public class HMaster extends HRegionServer implements MasterServices {
   private RegionNormalizerChore normalizerChore;
   private ClusterStatusChore clusterStatusChore;
   private ClusterStatusPublisher clusterStatusPublisherChore = null;
+  private PeriodicDoMetrics periodicDoMetricsChore = null;
 
   CatalogJanitor catalogJanitorChore;
   private ReplicationMetaCleaner replicationMetaCleaner;
@@ -446,6 +443,19 @@ public class HMaster extends HRegionServer implements MasterServices {
     }
   }
 
+  private static class PeriodicDoMetrics extends ScheduledChore {
+    private final HMaster server;
+    public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) {
+      super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval);
+      this.server = server;
+    }
+
+    @Override
+    protected void chore() {
+      server.doMetrics();
+    }
+  }
+
   /**
    * Initializes the HMaster. The steps are as follows:
    * <p>
@@ -648,6 +658,20 @@ public class HMaster extends HRegionServer implements MasterServices {
     return MasterDumpServlet.class;
   }
 
+  /**
+   * Emit the HMaster metrics, such as region in transition metrics.
+   * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
+   */
+  private void doMetrics() {
+    try {
+      if (assignmentManager != null) {
+        assignmentManager.updateRegionsInTransitionMetrics();
+      }
+    } catch (Throwable e) {
+      LOG.error("Couldn't update metrics: " + e.getMessage());
+    }
+  }
+
   MetricsMaster getMasterMetrics() {
     return metricsMaster;
   }
@@ -670,9 +694,8 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
     this.splitOrMergeTracker.start();
 
-    // Create Assignment Manager
-    this.assignmentManager = new AssignmentManager(this);
-    this.assignmentManager.start();
+    this.assignmentManager = new AssignmentManager(this, serverManager,
+      this.balancer, this.service, this.metricsMaster, tableStateManager);
 
     this.replicationManager = new ReplicationManager(conf, zooKeeper, this);
 
@@ -863,6 +886,10 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.catalogJanitorChore = new CatalogJanitor(this);
     getChoreService().scheduleChore(catalogJanitorChore);
 
+    // Do Metrics periodically
+    periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this);
+    getChoreService().scheduleChore(periodicDoMetricsChore);
+
     status.setStatus("Starting cluster schema service");
     initClusterSchemaService();
 
@@ -875,8 +902,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     }
 
     status.markComplete("Initialization successful");
-    LOG.info(String.format("Master has completed initialization %.3fsec",
-       (System.currentTimeMillis() - masterActiveTime) / 1000.0f));
+    LOG.info("Master has completed initialization");
     configurationManager.registerObserver(this.balancer);
     configurationManager.registerObserver(this.hfileCleaner);
 
@@ -985,8 +1011,8 @@ public class HMaster extends HRegionServer implements MasterServices {
     // Check zk for region servers that are up but didn't register
     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
       // The isServerOnline check is opportunistic, correctness is handled inside
-      if (!this.serverManager.isServerOnline(sn) &&
-          serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
+      if (!this.serverManager.isServerOnline(sn)
+          && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
       }
     }
@@ -1119,6 +1145,12 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   @Override
+  protected void sendShutdownInterrupt() {
+    super.sendShutdownInterrupt();
+    stopProcedureExecutor();
+  }
+
+  @Override
   protected void stopServiceThreads() {
     if (masterJettyServer != null) {
       LOG.info("Stopping master jetty server");
@@ -1140,20 +1172,15 @@ public class HMaster extends HRegionServer implements MasterServices {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Stopping service threads");
     }
-
     // Clean up and close up shop
     if (this.logCleaner != null) this.logCleaner.cancel(true);
     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
     if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
     if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
     if (this.quotaManager != null) this.quotaManager.stop();
-
     if (this.activeMasterManager != null) this.activeMasterManager.stop();
     if (this.serverManager != null) this.serverManager.stop();
     if (this.assignmentManager != null) this.assignmentManager.stop();
-
-    stopProcedureExecutor();
-
     if (this.walManager != null) this.walManager.stop();
     if (this.fileSystemManager != null) this.fileSystemManager.stop();
     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
@@ -1163,9 +1190,6 @@ public class HMaster extends HRegionServer implements MasterServices {
     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
     final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
-    // TODO: No cleaner currently!
-    final Path walArchiveDir = new Path(HFileArchiveUtil.getArchivePath(this.conf),
-        MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
 
     final FileSystem walFs = walDir.getFileSystem(conf);
 
@@ -1179,7 +1203,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
       HConstants.DEFAULT_WAL_STORAGE_POLICY);
 
-    procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, walArchiveDir,
+    procedureStore = new WALProcedureStore(conf, walFs, walDir,
         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
     MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
@@ -1194,20 +1218,16 @@ public class HMaster extends HRegionServer implements MasterServices {
         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
     procedureStore.start(numThreads);
     procedureExecutor.start(numThreads, abortOnCorruption);
-    procEnv.getRemoteDispatcher().start();
   }
 
   private void stopProcedureExecutor() {
     if (procedureExecutor != null) {
       configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
-      procedureExecutor.getEnvironment().getRemoteDispatcher().stop();
       procedureExecutor.stop();
-      procedureExecutor = null;
     }
 
     if (procedureStore != null) {
       procedureStore.stop(isAborted());
-      procedureStore = null;
     }
   }
 
@@ -1237,6 +1257,9 @@ public class HMaster extends HRegionServer implements MasterServices {
       this.mobCompactThread.close();
     }
 
+    if (this.periodicDoMetricsChore != null) {
+      periodicDoMetricsChore.cancel();
+    }
     if (this.quotaObserverChore != null) {
       quotaObserverChore.cancel();
     }
@@ -1297,7 +1320,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     // Sleep to next balance plan start time
     // But if there are zero regions in transition, it can skip sleep to speed up.
     while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime
-        && this.assignmentManager.getRegionStates().hasRegionsInTransition()) {
+        && this.assignmentManager.getRegionStates().getRegionsInTransitionCount() != 0) {
       try {
         Thread.sleep(100);
       } catch (InterruptedException ie) {
@@ -1308,7 +1331,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     // Throttling by max number regions in transition
     while (!interrupted
         && maxRegionsInTransition > 0
-        && this.assignmentManager.getRegionStates().getRegionsInTransition().size()
+        && this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
         >= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
       try {
         // sleep if the number of regions in transition exceeds the limit
@@ -1341,26 +1364,21 @@ public class HMaster extends HRegionServer implements MasterServices {
     synchronized (this.balancer) {
       // If balance not true, don't run balancer.
       if (!this.loadBalancerTracker.isBalancerOn()) return false;
-        // Only allow one balance run at at time.
-      if (this.assignmentManager.hasRegionsInTransition()) {
-        List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
+      // Only allow one balance run at at time.
+      if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
+        Set<RegionState> regionsInTransition =
+          this.assignmentManager.getRegionStates().getRegionsInTransition();
         // if hbase:meta region is in transition, result of assignment cannot be recorded
         // ignore the force flag in that case
-        boolean metaInTransition = assignmentManager.isMetaRegionInTransition();
+        boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
         String prefix = force && !metaInTransition ? "R" : "Not r";
-        List<RegionStateNode> toPrint = regionsInTransition;
-        int max = 5;
-        boolean truncated = false;
-        if (regionsInTransition.size() > max) {
-          toPrint = regionsInTransition.subList(0, max);
-          truncated = true;
-        }
-        LOG.info(prefix + "unning balancer because " + regionsInTransition.size() +
-          " region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
+        LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
+          " region(s) in transition: " + org.apache.commons.lang.StringUtils.
+            abbreviate(regionsInTransition.toString(), 256));
         if (!force || metaInTransition) return false;
       }
       if (this.serverManager.areDeadServersInProgress()) {
-        LOG.info("Not running balancer because processing dead regionserver(s): " +
+        LOG.debug("Not running balancer because processing dead regionserver(s): " +
           this.serverManager.getDeadServers());
         return false;
       }
@@ -1385,7 +1403,7 @@ public class HMaster extends HRegionServer implements MasterServices {
       //Give the balancer the current cluster state.
       this.balancer.setClusterStatus(getClusterStatus());
       this.balancer.setClusterLoad(
-              this.assignmentManager.getRegionStates().getAssignmentsByTable());
+              this.assignmentManager.getRegionStates().getAssignmentsByTable(true));
 
       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
@@ -1404,7 +1422,7 @@ public class HMaster extends HRegionServer implements MasterServices {
         for (RegionPlan plan: plans) {
           LOG.info("balance " + plan);
           //TODO: bulk assign
-          this.assignmentManager.moveAsync(plan);
+          this.assignmentManager.balance(plan);
           rpCount++;
 
           balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
@@ -1520,59 +1538,6 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   @Override
-  public long dispatchMergingRegions(
-      final HRegionInfo regionInfoA,
-      final HRegionInfo regionInfoB,
-      final boolean forcible,
-      final long nonceGroup,
-      final long nonce) throws IOException {
-    checkInitialized();
-
-    TableName tableName = regionInfoA.getTable();
-    if (tableName == null || regionInfoB.getTable() == null) {
-      throw new UnknownRegionException ("Can't merge regions without table associated");
-    }
-
-    if (!tableName.equals(regionInfoB.getTable())) {
-      throw new IOException ("Cannot merge regions from two different tables");
-    }
-
-    if (regionInfoA.compareTo(regionInfoB) == 0) {
-      throw new MergeRegionException(
-        "Cannot merge a region to itself " + regionInfoA + ", " + regionInfoB);
-    }
-
-    final HRegionInfo [] regionsToMerge = new HRegionInfo[2];
-    regionsToMerge [0] = regionInfoA;
-    regionsToMerge [1] = regionInfoB;
-
-    return MasterProcedureUtil.submitProcedure(
-        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
-      @Override
-      protected void run() throws IOException {
-        MasterCoprocessorHost mcph = getMaster().getMasterCoprocessorHost();
-        if (mcph != null) {
-          mcph.preDispatchMerge(regionInfoA, regionInfoB);
-        }
-
-        LOG.info(getClientIdAuditPrefix() + " Dispatch merge regions " +
-          regionsToMerge[0].getEncodedName() + " and " + regionsToMerge[1].getEncodedName());
-
-        submitProcedure(new DispatchMergingRegionsProcedure(
-            procedureExecutor.getEnvironment(), tableName, regionsToMerge, forcible));
-        if (mcph != null) {
-          mcph.postDispatchMerge(regionInfoA, regionInfoB);
-        }
-      }
-
-      @Override
-      protected String getDescription() {
-        return "DispatchMergingRegionsProcedure";
-      }
-    });
-  }
-
-  @Override
   public long mergeRegions(
       final HRegionInfo[] regionsToMerge,
       final boolean forcible,
@@ -1615,38 +1580,40 @@ public class HMaster extends HRegionServer implements MasterServices {
 
       @Override
       protected String getDescription() {
-        return "MergeTableProcedure";
+        return "DisableTableProcedure";
       }
     });
   }
 
   @Override
-  public long splitRegion(final HRegionInfo regionInfo, final byte[] splitRow,
-      final long nonceGroup, final long nonce)
-  throws IOException {
+  public long splitRegion(
+      final HRegionInfo regionInfo,
+      final byte[] splitRow,
+      final long nonceGroup,
+      final long nonce) throws IOException {
     checkInitialized();
+
     return MasterProcedureUtil.submitProcedure(
         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
       @Override
       protected void run() throws IOException {
         getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow);
-        LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString());
+
+        LOG.info(getClientIdAuditPrefix() + " Split region " + regionInfo);
 
         // Execute the operation asynchronously
-        submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow));
+        submitProcedure(new SplitTableRegionProcedure(procedureExecutor.getEnvironment(),
+            regionInfo, splitRow));
       }
 
       @Override
       protected String getDescription() {
-        return "SplitTableProcedure";
+        return "DisableTableProcedure";
       }
     });
   }
 
-  // Public so can be accessed by tests. Blocks until move is done.
-  // Replace with an async implementation from which you can get
-  // a success/failure result.
-  @VisibleForTesting
+  @VisibleForTesting // Public so can be accessed by tests.
   public void move(final byte[] encodedRegionName,
       final byte[] destServerName) throws HBaseIOException {
     RegionState regionState = assignmentManager.getRegionStates().
@@ -1697,8 +1664,6 @@ public class HMaster extends HRegionServer implements MasterServices {
 
     // Now we can do the move
     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
-    assert rp.getDestination() != null: rp.toString() + " " + dest;
-    assert rp.getSource() != null: rp.toString();
 
     try {
       checkInitialized();
@@ -1707,20 +1672,13 @@ public class HMaster extends HRegionServer implements MasterServices {
           return;
         }
       }
-      // Warmup the region on the destination before initiating the move. this call
+      // warmup the region on the destination before initiating the move. this call
       // is synchronous and takes some time. doing it before the source region gets
       // closed
       serverManager.sendRegionWarmup(rp.getDestination(), hri);
 
       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
-      Future<byte []> future = this.assignmentManager.moveAsync(rp);
-      try {
-        // Is this going to work? Will we throw exception on error?
-        // TODO: CompletableFuture rather than this stunted Future.
-        future.get();
-      } catch (InterruptedException | ExecutionException e) {
-        throw new HBaseIOException(e);
-      }
+      this.assignmentManager.balance(rp);
       if (this.cpHost != null) {
         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
       }
@@ -2059,7 +2017,7 @@ public class HMaster extends HRegionServer implements MasterServices {
           status.cleanup();
         }
       }
-    }, getServerName().toShortString() + ".masterManager"));
+    }, getServerName().toShortString() + ".activeMasterManager"));
   }
 
   private void checkCompression(final HTableDescriptor htd)
@@ -2512,9 +2470,8 @@ public class HMaster extends HRegionServer implements MasterServices {
 
     String clusterId = fileSystemManager != null ?
       fileSystemManager.getClusterId().toString() : null;
-    List<RegionState> regionsInTransition = assignmentManager != null ?
-      assignmentManager.getRegionStates().getRegionsStateInTransition() : null;
-
+    Set<RegionState> regionsInTransition = assignmentManager != null ?
+      assignmentManager.getRegionStates().getRegionsInTransition() : null;
     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
     boolean balancerOn = loadBalancerTracker != null ?
       loadBalancerTracker.isBalancerOn() : false;
@@ -2722,7 +2679,6 @@ public class HMaster extends HRegionServer implements MasterServices {
     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
   }
 
-  @Override
   public ProcedureEvent getInitializedEvent() {
     return initialized;
   }
@@ -2833,7 +2789,7 @@ public class HMaster extends HRegionServer implements MasterServices {
    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
    */
   public static void main(String [] args) {
-    LOG.info("STARTING service '" + HMaster.class.getSimpleName());
+    LOG.info("***** STARTING service '" + HMaster.class.getSimpleName() + "' *****");
     VersionInfo.logVersion();
     new HMasterCommandLine(HMaster.class).doMain(args);
   }
@@ -3278,7 +3234,6 @@ public class HMaster extends HRegionServer implements MasterServices {
    * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
    * @return The state of the switch
    */
-  @Override
   public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
     if (null == splitOrMergeTracker || isInMaintenanceMode()) {
       return false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index 4611982..129fa7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -45,7 +45,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
  * locations for all Regions in a cluster.
  *
  * <p>This class produces plans for the
- * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager}
+ * {@link org.apache.hadoop.hbase.master.AssignmentManager}
  * to execute.
  */
 @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 4d18ac9..6064f9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -810,28 +810,6 @@ public class MasterCoprocessorHost
     });
   }
 
-  public void preDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB)
-  throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
-      @Override
-      public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preDispatchMerge(ctx, regionInfoA, regionInfoB);
-      }
-    });
-  }
-
-  public void postDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB)
-  throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
-      @Override
-      public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postDispatchMerge(ctx, regionInfoA, regionInfoB);
-      }
-    });
-  }
-
   public void preMergeRegions(final HRegionInfo[] regionsToMerge)
       throws IOException {
     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
index a48444c..a921ab5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
@@ -24,6 +24,7 @@ import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.util.Date;
 import java.util.Map;
+import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -32,8 +33,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
 import org.apache.hadoop.hbase.monitoring.LogMonitoring;
 import org.apache.hadoop.hbase.monitoring.StateDumpServlet;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -118,8 +117,9 @@ public class MasterDumpServlet extends StateDumpServlet {
       return;
     }
 
-    for (RegionStateNode rs : am.getRegionsInTransition()) {
-      String rid = rs.getRegionInfo().getEncodedName();
+    Set<RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition();
+    for (RegionState rs : regionsInTransition) {
+      String rid = rs.getRegion().getRegionNameAsString();
       out.println("Region " + rid + ": " + rs.toDescriptiveString());
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
index 049e659..1988e2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -32,8 +33,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -107,7 +108,14 @@ public class MasterMetaBootstrap {
   }
 
   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
-    master.getMasterWalManager().splitMetaLog(currentMetaServer);
+    if (RecoveryMode.LOG_REPLAY == master.getMasterWalManager().getLogRecoveryMode()) {
+      // In log replay mode, we mark hbase:meta region as recovering in ZK
+      master.getMasterWalManager().prepareLogReplay(currentMetaServer,
+        Collections.<HRegionInfo>singleton(HRegionInfo.FIRST_META_REGIONINFO));
+    } else {
+      // In recovered.edits mode: create recovered edits file for hbase:meta server
+      master.getMasterWalManager().splitMetaLog(currentMetaServer);
+    }
   }
 
   private void unassignExcessMetaReplica(int numMetaReplicasConfigured) {
@@ -143,9 +151,7 @@ public class MasterMetaBootstrap {
 
     // Work on meta region
     int assigned = 0;
-    // TODO: Unimplemented
-    // long timeout =
-    //   master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000);
+    long timeout = master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000);
     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
       status.setStatus("Assigning hbase:meta region");
     } else {
@@ -154,10 +160,37 @@ public class MasterMetaBootstrap {
 
     // Get current meta state from zk.
     RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper(), replicaId);
-    LOG.debug("meta state from zookeeper: " + metaState);
-    HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(
-      HRegionInfo.FIRST_META_REGIONINFO, replicaId);
-    assignmentManager.assignMeta(hri, metaState.getServerName());
+    HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
+        replicaId);
+    RegionStates regionStates = assignmentManager.getRegionStates();
+    regionStates.createRegionState(hri, metaState.getState(),
+        metaState.getServerName(), null);
+
+    if (!metaState.isOpened() || !master.getMetaTableLocator().verifyMetaRegionLocation(
+        master.getClusterConnection(), master.getZooKeeper(), timeout, replicaId)) {
+      ServerName currentMetaServer = metaState.getServerName();
+      if (master.getServerManager().isServerOnline(currentMetaServer)) {
+        if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
+          LOG.info("Meta was in transition on " + currentMetaServer);
+        } else {
+          LOG.info("Meta with replicaId " + replicaId + " was in transition on " +
+                    currentMetaServer);
+        }
+        assignmentManager.processRegionsInTransition(Collections.singletonList(metaState));
+      } else {
+        if (currentMetaServer != null) {
+          if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
+            splitMetaLogBeforeAssignment(currentMetaServer);
+            regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
+            previouslyFailedMetaRSs.add(currentMetaServer);
+          }
+        }
+        LOG.info("Re-assigning hbase:meta with replicaId, " + replicaId +
+            " it was on " + currentMetaServer);
+        assignmentManager.assignMeta(hri);
+      }
+      assigned++;
+    }
 
     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
       // TODO: should we prevent from using state manager before meta was initialized?
@@ -166,6 +199,14 @@ public class MasterMetaBootstrap {
         .setTableState(TableName.META_TABLE_NAME, TableState.State.ENABLED);
     }
 
+    if ((RecoveryMode.LOG_REPLAY == master.getMasterWalManager().getLogRecoveryMode())
+        && (!previouslyFailedMetaRSs.isEmpty())) {
+      // replay WAL edits mode need new hbase:meta RS is assigned firstly
+      status.setStatus("replaying log for Meta Region");
+      master.getMasterWalManager().splitMetaLog(previouslyFailedMetaRSs);
+    }
+
+    assignmentManager.setEnabledTable(TableName.META_TABLE_NAME);
     master.getTableStateManager().start();
 
     // Make sure a hbase:meta location is set. We need to enable SSH here since
@@ -173,7 +214,7 @@ public class MasterMetaBootstrap {
     // by SSH so that system tables can be assigned.
     // No need to wait for meta is assigned = 0 when meta is just verified.
     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
-    LOG.info("hbase:meta with replicaId " + replicaId + ", location="
+    LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location="
       + master.getMetaTableLocator().getMetaRegionLocation(master.getZooKeeper(), replicaId));
     status.setStatus("META assigned.");
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index c43a4d1..296d4d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.ProcedureInfo;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
@@ -45,7 +46,6 @@ import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.QosPriority;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.locking.LockProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
@@ -86,6 +85,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
@@ -136,6 +136,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@@ -304,11 +306,7 @@ public class MasterRpcServices extends RSRpcServices
       ClusterStatusProtos.ServerLoad sl = request.getLoad();
       ServerName serverName = ProtobufUtil.toServerName(request.getServer());
       ServerLoad oldLoad = master.getServerManager().getLoad(serverName);
-      ServerLoad newLoad = new ServerLoad(sl);
-      master.getServerManager().regionServerReport(serverName, newLoad);
-      int version = VersionInfoUtil.getCurrentClientVersionNumber();
-      master.getAssignmentManager().reportOnlineRegions(serverName,
-        version, newLoad.getRegionsLoad().keySet());
+      master.getServerManager().regionServerReport(serverName, new ServerLoad(sl));
       if (sl != null && master.metricsMaster != null) {
         // Up our metrics.
         master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
@@ -381,25 +379,25 @@ public class MasterRpcServices extends RSRpcServices
   public AssignRegionResponse assignRegion(RpcController controller,
       AssignRegionRequest req) throws ServiceException {
     try {
-      master.checkInitialized();
+      final byte [] regionName = req.getRegion().getValue().toByteArray();
+      RegionSpecifierType type = req.getRegion().getType();
+      AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
 
-      final RegionSpecifierType type = req.getRegion().getType();
+      master.checkInitialized();
       if (type != RegionSpecifierType.REGION_NAME) {
         LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
           + " actual: " + type);
       }
-
-      final byte[] regionName = req.getRegion().getValue().toByteArray();
-      final HRegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName);
-      if (regionInfo == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
-
-      final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
+      RegionStates regionStates = master.getAssignmentManager().getRegionStates();
+      HRegionInfo regionInfo = regionStates.getRegionInfo(regionName);
+      if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
       if (master.cpHost != null) {
         if (master.cpHost.preAssign(regionInfo)) {
           return arr;
         }
       }
-      LOG.info(master.getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString());
+      LOG.info(master.getClientIdAuditPrefix()
+        + " assign " + regionInfo.getRegionNameAsString());
       master.getAssignmentManager().assign(regionInfo, true);
       if (master.cpHost != null) {
         master.cpHost.postAssign(regionInfo);
@@ -410,7 +408,6 @@ public class MasterRpcServices extends RSRpcServices
     }
   }
 
-
   @Override
   public BalanceResponse balance(RpcController controller,
       BalanceRequest request) throws ServiceException {
@@ -630,7 +627,8 @@ public class MasterRpcServices extends RSRpcServices
   }
 
   @Override
-  public SplitTableRegionResponse splitRegion(final RpcController controller,
+  public SplitTableRegionResponse splitRegion(
+      final RpcController controller,
       final SplitTableRegionRequest request) throws ServiceException {
     try {
       long procId = master.splitRegion(
@@ -1217,24 +1215,24 @@ public class MasterRpcServices extends RSRpcServices
   @Override
   public OfflineRegionResponse offlineRegion(RpcController controller,
       OfflineRegionRequest request) throws ServiceException {
+    final byte [] regionName = request.getRegion().getValue().toByteArray();
+    RegionSpecifierType type = request.getRegion().getType();
+    if (type != RegionSpecifierType.REGION_NAME) {
+      LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
+        + " actual: " + type);
+    }
+
     try {
       master.checkInitialized();
-
-      final RegionSpecifierType type = request.getRegion().getType();
-      if (type != RegionSpecifierType.REGION_NAME) {
-        LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
-          + " actual: " + type);
-      }
-
-      final byte[] regionName = request.getRegion().getValue().toByteArray();
-      final HRegionInfo hri = master.getAssignmentManager().getRegionInfo(regionName);
-      if (hri == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
-
+      Pair<HRegionInfo, ServerName> pair =
+        MetaTableAccessor.getRegion(master.getConnection(), regionName);
+      if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
+      HRegionInfo hri = pair.getFirst();
       if (master.cpHost != null) {
         master.cpHost.preRegionOffline(hri);
       }
       LOG.info(master.getClientIdAuditPrefix() + " offline " + hri.getRegionNameAsString());
-      master.getAssignmentManager().offlineRegion(hri);
+      master.getAssignmentManager().regionOffline(hri);
       if (master.cpHost != null) {
         master.cpHost.postRegionOffline(hri);
       }
@@ -1419,7 +1417,26 @@ public class MasterRpcServices extends RSRpcServices
       ReportRegionStateTransitionRequest req) throws ServiceException {
     try {
       master.checkServiceStarted();
-      return master.getAssignmentManager().reportRegionStateTransition(req);
+      RegionStateTransition rt = req.getTransition(0);
+      RegionStates regionStates = master.getAssignmentManager().getRegionStates();
+      for (RegionInfo ri : rt.getRegionInfoList())  {
+        TableName tableName = ProtobufUtil.toTableName(ri.getTableName());
+        if (!(TableName.META_TABLE_NAME.equals(tableName)
+            && regionStates.getRegionState(HRegionInfo.FIRST_META_REGIONINFO) != null)
+              && !master.getAssignmentManager().isFailoverCleanupDone()) {
+          // Meta region is assigned before master finishes the
+          // failover cleanup. So no need this check for it
+          throw new PleaseHoldException("Master is rebuilding user regions");
+        }
+      }
+      ServerName sn = ProtobufUtil.toServerName(req.getServer());
+      String error = master.getAssignmentManager().onRegionTransition(sn, rt);
+      ReportRegionStateTransitionResponse.Builder rrtr =
+        ReportRegionStateTransitionResponse.newBuilder();
+      if (error != null) {
+        rrtr.setErrorMessage(error);
+      }
+      return rrtr.build();
     } catch (IOException ioe) {
       throw new ServiceException(ioe);
     }
@@ -2008,34 +2025,4 @@ public class MasterRpcServices extends RSRpcServices
       throw new ServiceException(e);
     }
   }
-
-  @Override
-  public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller,
-      DispatchMergingRegionsRequest request) throws ServiceException {
-    final byte[] encodedNameOfRegionA = request.getRegionA().getValue().toByteArray();
-    final byte[] encodedNameOfRegionB = request.getRegionB().getValue().toByteArray();
-    if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME ||
-        request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
-      LOG.warn("mergeRegions specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME +
-          " actual: region_a=" +
-          request.getRegionA().getType() + ", region_b=" +
-          request.getRegionB().getType());
-    }
-    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
-    RegionState regionStateA = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionA));
-    RegionState regionStateB = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionB));
-    if (regionStateA == null || regionStateB == null) {
-      throw new ServiceException(new UnknownRegionException(
-        Bytes.toStringBinary(regionStateA == null? encodedNameOfRegionA: encodedNameOfRegionB)));
-    }
-    final HRegionInfo regionInfoA = regionStateA.getRegion();
-    final HRegionInfo regionInfoB = regionStateB.getRegion();
-    try {
-      long procId = master.dispatchMergingRegions(regionInfoA, regionInfoB, request.getForcible(),
-          request.getNonceGroup(), request.getNonce());
-      return DispatchMergingRegionsResponse.newBuilder().setProcId(procId).build();
-    } catch (IOException ioe) {
-      throw new ServiceException(ioe);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 781e907..4924d72 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -32,9 +32,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.locking.LockManager;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
@@ -42,14 +40,11 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.LockInfo;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Service;
 
 /**
@@ -128,12 +123,6 @@ public interface MasterServices extends Server {
   ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor();
 
   /**
-   * @return Tripped when Master has finished initialization.
-   */
-  @VisibleForTesting
-  public ProcedureEvent getInitializedEvent();
-
-  /**
    * Check table is modifiable; i.e. exists and is offline.
    * @param tableName Name of table to check.
    * @throws TableNotDisabledException
@@ -277,23 +266,6 @@ public interface MasterServices extends Server {
       throws IOException;
 
   /**
-   * Merge two regions. The real implementation is on the regionserver, master
-   * just move the regions together and send MERGE RPC to regionserver
-   * @param region_a region to merge
-   * @param region_b region to merge
-   * @param forcible true if do a compulsory merge, otherwise we will only merge
-   *          two adjacent regions
-   * @return procedure Id
-   * @throws IOException
-   */
-  long dispatchMergingRegions(
-    final HRegionInfo region_a,
-    final HRegionInfo region_b,
-    final boolean forcible,
-    final long nonceGroup,
-    final long nonce) throws IOException;
-
-  /**
    * Merge regions in a table.
    * @param regionsToMerge daughter regions to merge
    * @param forcible whether to force to merge even two regions are not adjacent
@@ -429,8 +401,6 @@ public interface MasterServices extends Server {
    */
   boolean isStopping();
 
-  boolean isSplitOrMergeEnabled(MasterSwitchType switchType);
-
   /**
    * @return Favored Nodes Manager
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
index 928702e..105fa29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -39,13 +41,12 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALSplitter;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class abstracts a bunch of operations the HMaster needs
  * when splitting log files e.g. finding log files, dirs etc.
@@ -331,4 +332,16 @@ public class MasterWalManager {
       }
     }
   }
+
+  /**
+   * The function is used in SSH to set recovery mode based on configuration after all outstanding
+   * log split tasks drained.
+   */
+  public void setLogRecoveryMode() throws IOException {
+    this.splitLogManager.setRecoveryMode(false);
+  }
+
+  public RecoveryMode getLogRecoveryMode() {
+    return this.splitLogManager.getRecoveryMode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java
index c7ce9a9..40e79ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 
 public class MetricsAssignmentManager {
+
   private final MetricsAssignmentManagerSource assignmentManagerSource;
 
   public MetricsAssignmentManager() {
@@ -32,11 +33,19 @@ public class MetricsAssignmentManager {
     return assignmentManagerSource;
   }
 
+  public void updateAssignmentTime(long time) {
+    assignmentManagerSource.updateAssignmentTime(time);
+  }
+
+  public void updateBulkAssignTime(long time) {
+    assignmentManagerSource.updateBulkAssignTime(time);
+  }
+
   /**
    * set new value for number of regions in transition.
    * @param ritCount
    */
-  public void updateRITCount(final int ritCount) {
+  public void updateRITCount(int ritCount) {
     assignmentManagerSource.setRIT(ritCount);
   }
 
@@ -45,15 +54,14 @@ public class MetricsAssignmentManager {
    * as defined by the property rit.metrics.threshold.time.
    * @param ritCountOverThreshold
    */
-  public void updateRITCountOverThreshold(final int ritCountOverThreshold) {
+  public void updateRITCountOverThreshold(int ritCountOverThreshold) {
     assignmentManagerSource.setRITCountOverThreshold(ritCountOverThreshold);
   }
-
   /**
    * update the timestamp for oldest region in transition metrics.
    * @param timestamp
    */
-  public void updateRITOldestAge(final long timestamp) {
+  public void updateRITOldestAge(long timestamp) {
     assignmentManagerSource.setRITOldestAge(timestamp);
   }
 
@@ -64,27 +72,4 @@ public class MetricsAssignmentManager {
   public void updateRitDuration(long duration) {
     assignmentManagerSource.updateRitDuration(duration);
   }
-
-  /*
-   * Increment the count of assignment operation (assign/unassign).
-   */
-  public void incrementOperationCounter() {
-    assignmentManagerSource.incrementOperationCounter();
-  }
-
-  /**
-   * Add the time took to perform the last assign operation
-   * @param time
-   */
-  public void updateAssignTime(final long time) {
-    assignmentManagerSource.updateAssignTime(time);
-  }
-
-  /**
-   * Add the time took to perform the last unassign operation
-   * @param time
-   */
-  public void updateUnassignTime(final long time) {
-    assignmentManagerSource.updateUnassignTime(time);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java
deleted file mode 100644
index e119e88..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-@InterfaceAudience.Private
-// Based on HBaseIOE rather than PE because easier to integrate when an IOE.
-public class NoSuchProcedureException extends HBaseIOException {
-  public NoSuchProcedureException() {
-    super();
-  }
-
-  public NoSuchProcedureException(String s) {
-    super(s);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
index 17eb346..cd6b313 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
@@ -135,8 +135,8 @@ public class RegionPlan implements Comparable<RegionPlan> {
 
   @Override
   public String toString() {
-    return "hri=" + this.hri.getRegionNameAsString() + ", source=" +
+    return "hri=" + this.hri.getRegionNameAsString() + ", src=" +
       (this.source == null? "": this.source.toString()) +
-      ", destination=" + (this.dest == null? "": this.dest.toString());
+      ", dest=" + (this.dest == null? "": this.dest.toString());
   }
 }


Mime
View raw message