hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [2/5] hbase git commit: HBASE-13616 Move ServerShutdownHandler to Pv2
Date Thu, 28 May 2015 22:04:40 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
index 0dd0c3d..af9eecf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -43,11 +44,12 @@ import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOpe
  * ProcedureRunnableSet for the Master Procedures.
  * This RunnableSet tries to provide to the ProcedureExecutor procedures
  * that can be executed without having to wait on a lock.
- * Most of the master operations can be executed concurrently, if the they
+ * Most of the master operations can be executed concurrently, if they
  * are operating on different tables (e.g. two create table can be performed
- * at the same, time assuming table A and table B).
+ * at the same, time assuming table A and table B) or against two different servers; say
+ * two servers that crashed at about the same time.
  *
- * Each procedure should implement an interface providing information for this queue.
+ * <p>Each procedure should implement an interface providing information for this queue.
  * for example table related procedures should implement TableProcedureInterface.
  * each procedure will be pushed in its own queue, and based on the operation type
  * we may take smarter decision. e.g. we can abort all the operations preceding
@@ -58,7 +60,18 @@ import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOpe
 public class MasterProcedureQueue implements ProcedureRunnableSet {
   private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class);
 
-  private final ProcedureFairRunQueues<TableName, RunQueue> fairq;
+  // Two queues to ensure that server procedures run ahead of table precedures always.
+  private final ProcedureFairRunQueues<TableName, RunQueue> tableFairQ;
+  /**
+   * Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the
+   * server that was carrying meta should rise to the top of the queue (this is how it used to
+   * work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers
+   * that were carrying system tables on crash; do I need to have these servers have priority?
+   * 
+   * <p>Apart from the special-casing of meta and system tables, fairq is what we want
+   */
+  private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ;
+
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition waitCond = lock.newCondition();
   private final TableLockManager lockManager;
@@ -66,11 +79,16 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
   private final int metaTablePriority;
   private final int userTablePriority;
   private final int sysTablePriority;
+  private static final int DEFAULT_SERVER_PRIORITY = 1;
 
+  /**
+   * Keeps count across server and table queues.
+   */
   private int queueSize;
 
   public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) {
-    this.fairq = new ProcedureFairRunQueues<TableName, RunQueue>(1);
+    this.tableFairQ = new ProcedureFairRunQueues<TableName, RunQueue>(1);
+    this.serverFairQ = new ProcedureFairRunQueues<ServerName, RunQueue>(1);
     this.lockManager = lockManager;
 
     // TODO: should this be part of the HTD?
@@ -105,12 +123,13 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
 
   @Override
   public void yield(final Procedure proc) {
-    addFront(proc);
+    addBack(proc);
   }
 
   @Override
   @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
   public Long poll() {
+    Long pollResult = null;
     lock.lock();
     try {
       if (queueSize == 0) {
@@ -119,19 +138,25 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
           return null;
         }
       }
-
-      RunQueue queue = fairq.poll();
-      if (queue != null && queue.isAvailable()) {
-        queueSize--;
-        return queue.poll();
+      // For now, let server handling have precedence over table handling; presumption is that it
+      // is more important handling crashed servers than it is running the
+      // enabling/disabling tables, etc.
+      pollResult = doPoll(serverFairQ.poll());
+      if (pollResult == null) {
+        pollResult = doPoll(tableFairQ.poll());
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      return null;
     } finally {
       lock.unlock();
     }
-    return null;
+    return pollResult;
+  }
+
+  private Long doPoll(final RunQueue rq) {
+    if (rq == null || !rq.isAvailable()) return null;
+    this.queueSize--;
+    return rq.poll();
   }
 
   @Override
@@ -148,7 +173,8 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
   public void clear() {
     lock.lock();
     try {
-      fairq.clear();
+      serverFairQ.clear();
+      tableFairQ.clear();
       queueSize = 0;
     } finally {
       lock.unlock();
@@ -169,7 +195,8 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
   public String toString() {
     lock.lock();
     try {
-      return "MasterProcedureQueue size=" + queueSize + ": " + fairq;
+      return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ +
+        ", serverFairQ: " + serverFairQ;
     } finally {
       lock.unlock();
     }
@@ -197,6 +224,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
         markTableAsDeleted(iProcTable.getTableName());
       }
     }
+    // No cleanup for ServerProcedureInterface types, yet.
   }
 
   private RunQueue getRunQueueOrCreate(final Procedure proc) {
@@ -204,17 +232,26 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
       final TableName table = ((TableProcedureInterface)proc).getTableName();
       return getRunQueueOrCreate(table);
     }
-    // TODO: at the moment we only have Table procedures
-    // if you are implementing a non-table procedure, you have two option create
-    // a group for all the non-table procedures or try to find a key for your
-    // non-table procedure and implement something similar to the TableRunQueue.
+    if (proc instanceof ServerProcedureInterface) {
+      return getRunQueueOrCreate((ServerProcedureInterface)proc);
+    }
+    // TODO: at the moment we only have Table and Server procedures
+    // if you are implementing a non-table/non-server procedure, you have two options: create
+    // a group for all the non-table/non-server procedures or try to find a key for your
+    // non-table/non-server procedures and implement something similar to the TableRunQueue.
     throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet");
   }
 
   private TableRunQueue getRunQueueOrCreate(final TableName table) {
     final TableRunQueue queue = getRunQueue(table);
     if (queue != null) return queue;
-    return (TableRunQueue)fairq.add(table, createTableRunQueue(table));
+    return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table));
+  }
+
+  private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) {
+    final ServerRunQueue queue = getRunQueue(spi.getServerName());
+    if (queue != null) return queue;
+    return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi));
   }
 
   private TableRunQueue createTableRunQueue(final TableName table) {
@@ -227,8 +264,35 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
     return new TableRunQueue(priority);
   }
 
+  private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) {
+    return new ServerRunQueue(DEFAULT_SERVER_PRIORITY);
+  }
+
   private TableRunQueue getRunQueue(final TableName table) {
-    return (TableRunQueue)fairq.get(table);
+    return (TableRunQueue)tableFairQ.get(table);
+  }
+
+  private ServerRunQueue getRunQueue(final ServerName sn) {
+    return (ServerRunQueue)serverFairQ.get(sn);
+  }
+
+  /**
+   * Try to acquire the write lock on the specified table.
+   * other operations in the table-queue will be executed after the lock is released.
+   * @param table Table to lock
+   * @param purpose Human readable reason for locking the table
+   * @return true if we were able to acquire the lock on the table, otherwise false.
+   */
+  public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
+    return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose);
+  }
+
+  /**
+   * Release the write lock taken with tryAcquireTableWrite()
+   * @param table the name of the table that has the write lock
+   */
+  public void releaseTableExclusiveLock(final TableName table) {
+    getRunQueue(table).releaseExclusiveLock(lockManager, table);
   }
 
   /**
@@ -239,35 +303,54 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
    * @param purpose Human readable reason for locking the table
    * @return true if we were able to acquire the lock on the table, otherwise false.
    */
-  public boolean tryAcquireTableRead(final TableName table, final String purpose) {
-    return getRunQueueOrCreate(table).tryRead(lockManager, table, purpose);
+  public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
+    return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose);
   }
 
   /**
    * Release the read lock taken with tryAcquireTableRead()
    * @param table the name of the table that has the read lock
    */
-  public void releaseTableRead(final TableName table) {
-    getRunQueue(table).releaseRead(lockManager, table);
+  public void releaseTableSharedLock(final TableName table) {
+    getRunQueue(table).releaseSharedLock(lockManager, table);
   }
 
   /**
-   * Try to acquire the write lock on the specified table.
-   * other operations in the table-queue will be executed after the lock is released.
-   * @param table Table to lock
-   * @param purpose Human readable reason for locking the table
-   * @return true if we were able to acquire the lock on the table, otherwise false.
+   * Try to acquire the write lock on the specified server.
+   * @see #releaseServerExclusiveLock(ServerProcedureInterface)
+   * @param spi Server to lock
+   * @return true if we were able to acquire the lock on the server, otherwise false.
    */
-  public boolean tryAcquireTableWrite(final TableName table, final String purpose) {
-    return getRunQueueOrCreate(table).tryWrite(lockManager, table, purpose);
+  public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) {
+    return getRunQueueOrCreate(spi).tryExclusiveLock();
   }
 
   /**
-   * Release the write lock taken with tryAcquireTableWrite()
-   * @param table the name of the table that has the write lock
+   * Release the write lock
+   * @see #tryAcquireServerExclusiveLock(ServerProcedureInterface)
+   * @param spi the server that has the write lock
    */
-  public void releaseTableWrite(final TableName table) {
-    getRunQueue(table).releaseWrite(lockManager, table);
+  public void releaseServerExclusiveLock(final ServerProcedureInterface spi) {
+    getRunQueue(spi.getServerName()).releaseExclusiveLock();
+  }
+
+  /**
+   * Try to acquire the read lock on the specified server.
+   * @see #releaseServerSharedLock(ServerProcedureInterface)
+   * @param spi Server to lock
+   * @return true if we were able to acquire the lock on the server, otherwise false.
+   */
+  public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) {
+    return getRunQueueOrCreate(spi).trySharedLock();
+  }
+
+  /**
+   * Release the read lock taken
+   * @see #tryAcquireServerSharedLock(ServerProcedureInterface)
+   * @param spi the server that has the read lock
+   */
+  public void releaseServerSharedLock(final ServerProcedureInterface spi) {
+    getRunQueue(spi.getServerName()).releaseSharedLock();
   }
 
   /**
@@ -284,7 +367,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
       lock.lock();
       try {
         if (queue.isEmpty() && !queue.isLocked()) {
-          fairq.remove(table);
+          tableFairQ.remove(table);
 
           // Remove the table lock
           try {
@@ -311,114 +394,167 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
   }
 
   /**
-   * Run Queue for a Table. It contains a read-write lock that is used by the
-   * MasterProcedureQueue to decide if we should fetch an item from this queue
-   * or skip to another one which will be able to run without waiting for locks.
+   * Base abstract class for RunQueue implementations.
+   * Be careful honoring synchronizations in subclasses. In here we protect access but if you are
+   * acting on a state found in here, be sure dependent code keeps synchronization.
+   * Implements basic in-memory read/write locking mechanism to prevent procedure steps being run
+   * in parallel.
    */
-  private static class TableRunQueue implements RunQueue {
+  private static abstract class AbstractRunQueue implements RunQueue {
+    // All modification of runnables happens with #lock held.
     private final Deque<Long> runnables = new ArrayDeque<Long>();
     private final int priority;
+    private boolean exclusiveLock = false;
+    private int sharedLock = 0;
 
-    private TableLock tableLock = null;
-    private boolean wlock = false;
-    private int rlock = 0;
-
-    public TableRunQueue(int priority) {
+    public AbstractRunQueue(int priority) {
       this.priority = priority;
     }
 
+    boolean isEmpty() {
+      return this.runnables.isEmpty();
+    }
+
     @Override
-    public void addFront(final Procedure proc) {
-      runnables.addFirst(proc.getProcId());
+    public boolean isAvailable() {
+      synchronized (this) {
+        return !exclusiveLock && !runnables.isEmpty();
+      }
     }
 
-    // TODO: Improve run-queue push with TableProcedureInterface.getType()
-    //       we can take smart decisions based on the type of the operation (e.g. create/delete)
     @Override
-    public void addBack(final Procedure proc) {
-      runnables.addLast(proc.getProcId());
+    public int getPriority() {
+      return this.priority;
+    }
+
+    @Override
+    public void addFront(Procedure proc) {
+      this.runnables.addFirst(proc.getProcId());
+    }
+
+    @Override
+    public void addBack(Procedure proc) {
+      this.runnables.addLast(proc.getProcId());
     }
 
     @Override
     public Long poll() {
-      return runnables.poll();
+      return this.runnables.poll();
     }
 
     @Override
-    public boolean isAvailable() {
-      synchronized (this) {
-        return !wlock && !runnables.isEmpty();
-      }
+    public synchronized boolean isLocked() {
+      return isExclusiveLock() || sharedLock > 0;
+    }
+
+    public synchronized boolean isExclusiveLock() {
+      return this.exclusiveLock;
+    }
+
+    public synchronized boolean trySharedLock() {
+      if (isExclusiveLock()) return false;
+      sharedLock++;
+      return true;
+    }
+
+    public synchronized void releaseSharedLock() {
+      sharedLock--;
     }
 
-    public boolean isEmpty() {
-      return runnables.isEmpty();
+    /**
+     * @return True if only one instance of a shared lock outstanding.
+     */
+    synchronized boolean isSingleSharedLock() {
+      return sharedLock == 1;
     }
 
+    public synchronized boolean tryExclusiveLock() {
+      if (isLocked()) return false;
+      exclusiveLock = true;
+      return true;
+    }
+
+    public synchronized void releaseExclusiveLock() {
+      exclusiveLock = false;
+    }
+ 
     @Override
-    public boolean isLocked() {
-      synchronized (this) {
-        return wlock || rlock > 0;
-      }
+    public String toString() {
+      return this.runnables.toString();
     }
+  }
 
-    public boolean tryRead(final TableLockManager lockManager,
-        final TableName tableName, final String purpose) {
-      synchronized (this) {
-        if (wlock) {
-          return false;
-        }
+  /**
+   * Run Queue for Server procedures.
+   */
+  private static class ServerRunQueue extends AbstractRunQueue {
+    public ServerRunQueue(int priority) {
+      super(priority);
+    }
+  }
 
-        // Take zk-read-lock
-        tableLock = lockManager.readLock(tableName, purpose);
-        try {
-          tableLock.acquire();
-        } catch (IOException e) {
-          LOG.error("failed acquire read lock on " + tableName, e);
-          tableLock = null;
-          return false;
-        }
+  /**
+   * Run Queue for a Table. It contains a read-write lock that is used by the
+   * MasterProcedureQueue to decide if we should fetch an item from this queue
+   * or skip to another one which will be able to run without waiting for locks.
+   */
+  private static class TableRunQueue extends AbstractRunQueue {
+    private TableLock tableLock = null;
 
-        rlock++;
+    public TableRunQueue(int priority) {
+      super(priority);
+    }
+
+    // TODO: Improve run-queue push with TableProcedureInterface.getType()
+    //       we can take smart decisions based on the type of the operation (e.g. create/delete)
+    @Override
+    public void addBack(final Procedure proc) {
+      super.addBack(proc);
+    }
+
+    public synchronized boolean trySharedLock(final TableLockManager lockManager,
+        final TableName tableName, final String purpose) {
+      if (isExclusiveLock()) return false;
+
+      // Take zk-read-lock
+      tableLock = lockManager.readLock(tableName, purpose);
+      try {
+        tableLock.acquire();
+      } catch (IOException e) {
+        LOG.error("failed acquire read lock on " + tableName, e);
+        tableLock = null;
+        return false;
       }
+      trySharedLock();
       return true;
     }
 
-    public void releaseRead(final TableLockManager lockManager,
+    public synchronized void releaseSharedLock(final TableLockManager lockManager,
         final TableName tableName) {
-      synchronized (this) {
-        releaseTableLock(lockManager, rlock == 1);
-        rlock--;
-      }
+      releaseTableLock(lockManager, isSingleSharedLock());
+      releaseSharedLock();
     }
 
-    public boolean tryWrite(final TableLockManager lockManager,
+    public synchronized boolean tryExclusiveLock(final TableLockManager lockManager,
         final TableName tableName, final String purpose) {
-      synchronized (this) {
-        if (wlock || rlock > 0) {
-          return false;
-        }
-
-        // Take zk-write-lock
-        tableLock = lockManager.writeLock(tableName, purpose);
-        try {
-          tableLock.acquire();
-        } catch (IOException e) {
-          LOG.error("failed acquire write lock on " + tableName, e);
-          tableLock = null;
-          return false;
-        }
-        wlock = true;
+      if (isLocked()) return false;
+      // Take zk-write-lock
+      tableLock = lockManager.writeLock(tableName, purpose);
+      try {
+        tableLock.acquire();
+      } catch (IOException e) {
+        LOG.error("failed acquire write lock on " + tableName, e);
+        tableLock = null;
+        return false;
       }
+      tryExclusiveLock();
       return true;
     }
 
-    public void releaseWrite(final TableLockManager lockManager,
+    public synchronized void releaseExclusiveLock(final TableLockManager lockManager,
         final TableName tableName) {
-      synchronized (this) {
-        releaseTableLock(lockManager, true);
-        wlock = false;
-      }
+      releaseTableLock(lockManager, true);
+      releaseExclusiveLock();
     }
 
     private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
@@ -434,15 +570,5 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
         }
       }
     }
-
-    @Override
-    public int getPriority() {
-      return priority;
-    }
-
-    @Override
-    public String toString() {
-      return runnables.toString();
-    }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
index 3de5202..10ad91a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
@@ -182,14 +182,14 @@ public class ModifyColumnFamilyProcedure
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
     if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableWrite(
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
       tableName,
       EventType.C_M_MODIFY_FAMILY.toString());
   }
 
   @Override
   protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseTableWrite(tableName);
+    env.getProcedureQueue().releaseTableExclusiveLock(tableName);
   }
 
   @Override
@@ -379,4 +379,4 @@ public class ModifyColumnFamilyProcedure
       });
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index e9636e6..4e73e77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -214,14 +214,14 @@ public class ModifyTableProcedure
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
     if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableWrite(
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
       getTableName(),
       EventType.C_M_MODIFY_TABLE.toString());
   }
 
   @Override
   protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseTableWrite(getTableName());
+    env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
   }
 
   @Override
@@ -507,4 +507,4 @@ public class ModifyTableProcedure
     }
     return regionInfoList;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
new file mode 100644
index 0000000..e72c4c0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ServerCrashState;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
+ * ServerShutdownHandler.
+ *
+ * <p>The procedure flow varies dependent on whether meta is assigned, if we are
+ * doing distributed log replay versus distributed log splitting, and if we are to split logs at
+ * all.
+ *
+ * <p>This procedure asks that all crashed servers get processed equally; we yield after the
+ * completion of each successful flow step. We do this so that we do not 'deadlock' waiting on
+ * a region assignment so we can replay edits which could happen if a region moved there are edits
+ * on two servers for replay.
+ *
+ * <p>TODO: ASSIGN and WAIT_ON_ASSIGN (at least) are not idempotent. Revisit when assign is pv2.
+ * TODO: We do not have special handling for system tables.
+ */
+public class ServerCrashProcedure
+extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState>
+implements ServerProcedureInterface {
+  private static final Log LOG = LogFactory.getLog(ServerCrashProcedure.class);
+
+  /**
+   * Configuration key to set how long to wait in ms doing a quick check on meta state.
+   */
+  public static final String KEY_SHORT_WAIT_ON_META =
+      "hbase.master.servercrash.short.wait.on.meta.ms";
+
+  public static final int DEFAULT_SHORT_WAIT_ON_META = 1000;
+
+  /**
+   * Configuration key to set how many retries to cycle before we give up on meta.
+   * Each attempt will wait at least {@link #KEY_SHORT_WAIT_ON_META} milliseconds.
+   */
+  public static final String KEY_RETRIES_ON_META =
+      "hbase.master.servercrash.meta.retries";
+
+  public static final int DEFAULT_RETRIES_ON_META = 10;
+
+  /**
+   * Configuration key to set how long to wait in ms on regions in transition.
+   */
+  public static final String KEY_WAIT_ON_RIT =
+      "hbase.master.servercrash.wait.on.rit.ms";
+
+  public static final int DEFAULT_WAIT_ON_RIT = 30000;
+
+  private static final Set<HRegionInfo> META_REGION_SET = new HashSet<HRegionInfo>();
+  static {
+    META_REGION_SET.add(HRegionInfo.FIRST_META_REGIONINFO);
+  }
+
+  /**
+   * Name of the crashed server to process.
+   */
+  private ServerName serverName;
+
+  /**
+   * Regions that were on the crashed server.
+   */
+  private Set<HRegionInfo> regionsOnCrashedServer;
+
+  /**
+   * Regions to assign. Usually some subset of {@link #regionsOnCrashedServer}
+   */
+  private List<HRegionInfo> regionsToAssign;
+
+  private boolean distributedLogReplay = false;
+  private boolean carryingMeta = false;
+  private boolean shouldSplitWal;
+
+  /**
+   * Cycles on same state. Good for figuring if we are stuck.
+   */
+  private int cycles = 0;
+
+  /**
+   * Ordinal of the previous state. So we can tell if we are progressing or not. TODO: if useful,
+   * move this back up into StateMachineProcedure
+   */
+  private int previousState;
+
+  /**
+   * Call this constructor queuing up a Procedure.
+   * @param serverName Name of the crashed server.
+   * @param shouldSplitWal True if we should split WALs as part of crashed server processing.
+   * @param carryingMeta True if carrying hbase:meta table region.
+   */
+  public ServerCrashProcedure(final ServerName serverName,
+      final boolean shouldSplitWal, final boolean carryingMeta) {
+    this.serverName = serverName;
+    this.shouldSplitWal = shouldSplitWal;
+    this.carryingMeta = carryingMeta;
+    // Currently not used.
+  }
+
+  /**
+   * Used when deserializing from a procedure store; we'll construct one of these then call
+   * {@link #deserializeStateData(InputStream)}. Do not use directly.
+   */
+  public ServerCrashProcedure() {
+    super();
+  }
+
+  private void throwProcedureYieldException(final String msg) throws ProcedureYieldException {
+    String logMsg = msg + "; cycle=" + this.cycles + ", running for " +
+        StringUtils.formatTimeDiff(System.currentTimeMillis(), getStartTime());
+    // The procedure executor logs ProcedureYieldException at trace level. For now, log these
+    // yields for server crash processing at DEBUG. Revisit when stable.
+    if (LOG.isDebugEnabled()) LOG.debug(logMsg);
+    throw new ProcedureYieldException(logMsg);
+  }
+
+  @Override
+  protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
+  throws ProcedureYieldException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(state);
+    }
+    // Keep running count of cycles
+    if (state.ordinal() != this.previousState) {
+      this.previousState = state.ordinal();
+      this.cycles = 0;
+    } else {
+      this.cycles++;
+    }
+    MasterServices services = env.getMasterServices();
+    try {
+      switch (state) {
+      case SERVER_CRASH_START:
+        // Is master fully online? If not, yield. No processing of servers unless master is up
+        if (!services.getAssignmentManager().isFailoverCleanupDone()) {
+          throwProcedureYieldException("Waiting on master failover to complete");
+        }
+        LOG.info("Start processing crashed " + this.serverName);
+        start(env);
+        // If carrying meta, process it first. Else, get list of regions on crashed server.
+        if (this.carryingMeta) setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
+        else setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
+        break;
+
+      case SERVER_CRASH_GET_REGIONS:
+        // If hbase:meta is not assigned, yield.
+        if (!isMetaAssignedQuickTest(env)) {
+          throwProcedureYieldException("Waiting on hbase:meta assignment");
+        }
+        this.regionsOnCrashedServer =
+          services.getAssignmentManager().getRegionStates().getServerRegions(this.serverName);
+        // Where to go next? Depends on whether we should split logs at all or if we should do
+        // distributed log splitting (DLS) vs distributed log replay (DLR).
+        if (!this.shouldSplitWal) {
+          setNextState(ServerCrashState.SERVER_CRASH_CALC_REGIONS_TO_ASSIGN);
+        } else if (this.distributedLogReplay) {
+          setNextState(ServerCrashState.SERVER_CRASH_PREPARE_LOG_REPLAY);
+        } else {
+          setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
+        }
+        break;
+
+      case SERVER_CRASH_PROCESS_META:
+        // If we fail processing hbase:meta, yield.
+        if (!processMeta(env)) {
+          throwProcedureYieldException("Waiting on regions-in-transition to clear");
+        }
+        setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
+        break;
+
+      case SERVER_CRASH_PREPARE_LOG_REPLAY:
+        prepareLogReplay(env, this.regionsOnCrashedServer);
+        setNextState(ServerCrashState.SERVER_CRASH_CALC_REGIONS_TO_ASSIGN);
+        break;
+
+      case SERVER_CRASH_SPLIT_LOGS:
+        splitLogs(env);
+        // If DLR, go to FINISH. Otherwise, if DLS, go to SERVER_CRASH_CALC_REGIONS_TO_ASSIGN
+        if (this.distributedLogReplay) setNextState(ServerCrashState.SERVER_CRASH_FINISH);
+        else setNextState(ServerCrashState.SERVER_CRASH_CALC_REGIONS_TO_ASSIGN);
+        break;
+
+      case SERVER_CRASH_CALC_REGIONS_TO_ASSIGN:
+        this.regionsToAssign = calcRegionsToAssign(env);
+        setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+        break;
+
+      case SERVER_CRASH_ASSIGN:
+        // Assign may not be idempotent. SSH used to requeue the SSH if we got an IOE assigning
+        // which is what we are mimicing here but it looks prone to double assignment if assign
+        // fails midway. TODO: Test.
+
+        // If no regions to assign, skip assign and skip to the finish.
+        boolean regions = this.regionsToAssign != null && !this.regionsToAssign.isEmpty();
+        if (regions) {
+          if (!assign(env, this.regionsToAssign)) {
+            throwProcedureYieldException("Failed assign; will retry");
+          }
+        }
+        if (regions && this.shouldSplitWal && distributedLogReplay) {
+          setNextState(ServerCrashState.SERVER_CRASH_WAIT_ON_ASSIGN);
+        } else {
+          setNextState(ServerCrashState.SERVER_CRASH_FINISH);
+        }
+        break;
+
+      case SERVER_CRASH_WAIT_ON_ASSIGN:
+        // TODO: The list of regionsToAssign may be more than we actually assigned. See down in
+        // AM #1629 around 'if (regionStates.wasRegionOnDeadServer(encodedName)) {' where where we
+        // will skip assigning a region because it is/was on a dead server. Should never happen!
+        // It was on this server. Worst comes to worst, we'll still wait here till other server is
+        // processed.
+
+        // If the wait on assign failed, yield -- if we have regions to assign.
+        if (this.regionsToAssign != null && !this.regionsToAssign.isEmpty()) {
+          if (!waitOnAssign(env, this.regionsToAssign)) {
+            throwProcedureYieldException("Waiting on region assign");
+          }
+        }
+        setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
+        break;
+
+      case SERVER_CRASH_FINISH:
+        LOG.info("Finished processing of crashed " + serverName);
+        services.getServerManager().getDeadServers().finish(serverName);
+        return Flow.NO_MORE_STATE;
+
+      default:
+          throw new UnsupportedOperationException("unhandled state=" + state);
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed serverName=" + this.serverName + ", state=" + state + "; retry", e);
+    } catch (InterruptedException e) {
+      // TODO: Make executor allow IEs coming up out of execute.
+      LOG.warn("Interrupted serverName=" + this.serverName + ", state=" + state + "; retry", e);
+      Thread.currentThread().interrupt();
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  /**
+   * Start processing of crashed server. In here we'll just set configs. and return.
+   * @param env
+   * @throws IOException
+   */
+  private void start(final MasterProcedureEnv env) throws IOException {
+    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    // Set recovery mode late. This is what the old ServerShutdownHandler used do.
+    mfs.setLogRecoveryMode();
+    this.distributedLogReplay = mfs.getLogRecoveryMode() == RecoveryMode.LOG_REPLAY;
+  }
+
+  /**
+   * @param env
+   * @return False if we fail to assign and split logs on meta ('process').
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean processMeta(final MasterProcedureEnv env)
+  throws IOException {
+    if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + this.serverName);
+    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO;
+    if (this.shouldSplitWal) {
+      if (this.distributedLogReplay) {
+        prepareLogReplay(env, META_REGION_SET);
+      } else {
+        // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
+        mfs.splitMetaLog(serverName);
+        am.getRegionStates().logSplit(metaHRI);
+      }
+    }
+
+    // Assign meta if still carrying it. Check again: region may be assigned because of RIT timeout
+    boolean processed = true;
+    if (am.isCarryingMeta(serverName)) {
+      // TODO: May block here if hard time figuring state of meta.
+      am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
+      verifyAndAssignMetaWithRetries(env);
+      if (this.shouldSplitWal && distributedLogReplay) {
+        int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT);
+        if (!waitOnRegionToClearRegionsInTransition(am, metaHRI, timeout)) {
+          processed = false;
+        } else {
+          // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
+          mfs.splitMetaLog(serverName);
+        }
+      }
+    }
+    return processed;
+  }
+
+  /**
+   * @return True if region cleared RIT, else false if we timed out waiting.
+   * @throws InterruptedIOException
+   */
+  private boolean waitOnRegionToClearRegionsInTransition(AssignmentManager am,
+      final HRegionInfo hri, final int timeout)
+  throws InterruptedIOException {
+    try {
+      if (!am.waitOnRegionToClearRegionsInTransition(hri, timeout)) {
+        // Wait here is to avoid log replay hits current dead server and incur a RPC timeout
+        // when replay happens before region assignment completes.
+        LOG.warn("Region " + hri.getEncodedName() + " didn't complete assignment in time");
+        return false;
+      }
+    } catch (InterruptedException ie) {
+      throw new InterruptedIOException("Caught " + ie +
+        " during waitOnRegionToClearRegionsInTransition for " + hri);
+    }
+    return true;
+  }
+
+  private void prepareLogReplay(final MasterProcedureEnv env, final Set<HRegionInfo> regions)
+  throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Mark " + size(this.regionsOnCrashedServer) +
+        " regions-in-recovery from " + this.serverName);
+    }
+    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    mfs.prepareLogReplay(this.serverName, regions);
+    am.getRegionStates().logSplit(this.serverName);
+  }
+
+  private void splitLogs(final MasterProcedureEnv env) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Splitting logs from " + serverName + "; region count=" +
+        size(this.regionsOnCrashedServer));
+    }
+    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
+    mfs.splitLog(this.serverName);
+    am.getRegionStates().logSplit(this.serverName);
+  }
+
+  static int size(final Collection<HRegionInfo> hris) {
+    return hris == null? 0: hris.size();
+  }
+
+  /**
+   * Figure out what we need to assign. Should be idempotent.
+   * @param env
+   * @return List of calculated regions to assign; may be empty or null.
+   * @throws IOException
+   */
+  private List<HRegionInfo> calcRegionsToAssign(final MasterProcedureEnv env)
+  throws IOException {
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    List<HRegionInfo> regionsToAssignAggregator = new ArrayList<HRegionInfo>();
+    int replicaCount = env.getMasterConfiguration().getInt(HConstants.META_REPLICAS_NUM,
+      HConstants.DEFAULT_META_REPLICA_NUM);
+    for (int i = 1; i < replicaCount; i++) {
+      HRegionInfo metaHri =
+          RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, i);
+      if (am.isCarryingMetaReplica(this.serverName, metaHri)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Reassigning meta replica" + metaHri + " that was on " + this.serverName);
+        }
+        regionsToAssignAggregator.add(metaHri);
+      }
+    }
+    // Clean out anything in regions in transition.
+    List<HRegionInfo> regionsInTransition = am.cleanOutCrashedServerReferences(serverName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reassigning " + size(this.regionsOnCrashedServer) +
+        " region(s) that " + (serverName == null? "null": serverName)  +
+        " was carrying (and " + regionsInTransition.size() +
+        " regions(s) that were opening on this server)");
+    }
+    regionsToAssignAggregator.addAll(regionsInTransition);
+
+    // Iterate regions that were on this server and figure which of these we need to reassign
+    if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
+      RegionStates regionStates = am.getRegionStates();
+      for (HRegionInfo hri: this.regionsOnCrashedServer) {
+        if (regionsInTransition.contains(hri)) continue;
+        String encodedName = hri.getEncodedName();
+        Lock lock = am.acquireRegionLock(encodedName);
+        try {
+          RegionState rit = regionStates.getRegionTransitionState(hri);
+          if (processDeadRegion(hri, am)) {
+            ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
+            if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
+              // If this region is in transition on the dead server, it must be
+              // opening or pending_open, which should have been covered by
+              // AM#cleanOutCrashedServerReferences
+              LOG.info("Skip assigning region " + hri.getRegionNameAsString()
+                + " because it has been opened in " + addressFromAM.getServerName());
+              continue;
+            }
+            if (rit != null) {
+              if (rit.getServerName() != null && !rit.isOnServer(this.serverName)) {
+                // Skip regions that are in transition on other server
+                LOG.info("Skip assigning region in transition on other server" + rit);
+                continue;
+              }
+              LOG.info("Reassigning region " + rit + " and clearing zknode if exists");
+              try {
+                // This clears out any RIT that might be sticking around.
+                ZKAssign.deleteNodeFailSilent(env.getMasterServices().getZooKeeper(), hri);
+              } catch (KeeperException e) {
+                // TODO: FIX!!!! ABORTING SERVER BECAUSE COULDN"T PURGE ZNODE. This is what we
+                // used to do but that doesn't make it right!!!
+                env.getMasterServices().abort("Unexpected error deleting RIT " + hri, e);
+                throw new IOException(e);
+              }
+              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
+            } else if (regionStates.isRegionInState(
+                hri, RegionState.State.SPLITTING_NEW, RegionState.State.MERGING_NEW)) {
+              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
+            }
+            regionsToAssignAggregator.add(hri);
+          // TODO: The below else if is different in branch-1 from master branch.
+          } else if (rit != null) {
+            if ((rit.isPendingCloseOrClosing() || rit.isOffline())
+                && am.getTableStateManager().isTableState(hri.getTable(),
+                ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
+                am.getReplicasToClose().contains(hri)) {
+              // If the table was partially disabled and the RS went down, we should clear the
+              // RIT and remove the node for the region.
+              // The rit that we use may be stale in case the table was in DISABLING state
+              // but though we did assign we will not be clearing the znode in CLOSING state.
+              // Doing this will have no harm. See HBASE-5927
+              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
+              am.deleteClosingOrClosedNode(hri, rit.getServerName());
+              am.offlineDisabledRegion(hri);
+            } else {
+              LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "
+                + rit + " not to be assigned by SSH of server " + serverName);
+            }
+          }
+        } finally {
+          lock.unlock();
+        }
+      }
+    }
+    return regionsToAssignAggregator;
+  }
+
+  private boolean assign(final MasterProcedureEnv env, final List<HRegionInfo> hris)
+  throws InterruptedIOException {
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    try {
+      am.assign(hris);
+    } catch (InterruptedException ie) {
+      LOG.error("Caught " + ie + " during round-robin assignment");
+      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
+    } catch (IOException ioe) {
+      LOG.info("Caught " + ioe + " during region assignment, will retry");
+      return false;
+    }
+    return true;
+  }
+
+  private boolean waitOnAssign(final MasterProcedureEnv env, final List<HRegionInfo> hris)
+  throws InterruptedIOException {
+    int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT);
+    for (HRegionInfo hri: hris) {
+      // TODO: Blocks here.
+      if (!waitOnRegionToClearRegionsInTransition(env.getMasterServices().getAssignmentManager(),
+          hri, timeout)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
+  throws IOException {
+    // Can't rollback.
+    throw new UnsupportedOperationException("unhandled state=" + state);
+  }
+
+  @Override
+  protected ServerCrashState getState(int stateId) {
+    return ServerCrashState.valueOf(stateId);
+  }
+
+  @Override
+  protected int getStateId(ServerCrashState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected ServerCrashState getInitialState() {
+    return ServerCrashState.SERVER_CRASH_START;
+  }
+
+  @Override
+  protected boolean abort(MasterProcedureEnv env) {
+    // TODO
+    return false;
+  }
+
+  @Override
+  protected boolean acquireLock(final MasterProcedureEnv env) {
+    if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false;
+    return env.getProcedureQueue().tryAcquireServerExclusiveLock(this);
+  }
+
+  @Override
+  protected void releaseLock(final MasterProcedureEnv env) {
+    env.getProcedureQueue().releaseServerExclusiveLock(this);
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder sb) {
+    sb.append(getClass().getSimpleName());
+    sb.append(" serverName=");
+    sb.append(this.serverName);
+    sb.append(", shouldSplitWal=");
+    sb.append(shouldSplitWal);
+    sb.append(", carryingMeta=");
+    sb.append(carryingMeta);
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) throws IOException {
+    super.serializeStateData(stream);
+
+    MasterProcedureProtos.ServerCrashStateData.Builder state =
+      MasterProcedureProtos.ServerCrashStateData.newBuilder().
+      setServerName(ProtobufUtil.toServerName(this.serverName)).
+      setDistributedLogReplay(this.distributedLogReplay).
+      setCarryingMeta(this.carryingMeta).
+      setShouldSplitWal(this.shouldSplitWal);
+    if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
+      for (HRegionInfo hri: this.regionsOnCrashedServer) {
+        state.addRegionsOnCrashedServer(HRegionInfo.convert(hri));
+      }
+    }
+    if (this.regionsToAssign != null && !this.regionsToAssign.isEmpty()) {
+      for (HRegionInfo hri: this.regionsToAssign) {
+        state.addRegionsToAssign(HRegionInfo.convert(hri));
+      }
+    }
+    state.build().writeDelimitedTo(stream);
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) throws IOException {
+    super.deserializeStateData(stream);
+
+    MasterProcedureProtos.ServerCrashStateData state =
+      MasterProcedureProtos.ServerCrashStateData.parseDelimitedFrom(stream);
+    this.serverName = ProtobufUtil.toServerName(state.getServerName());
+    this.distributedLogReplay = state.hasDistributedLogReplay()?
+      state.getDistributedLogReplay(): false;
+    this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): false;
+    // shouldSplitWAL has a default over in pb so this invocation will always work.
+    this.shouldSplitWal = state.getShouldSplitWal();
+    int size = state.getRegionsOnCrashedServerCount();
+    if (size > 0) {
+      this.regionsOnCrashedServer = new HashSet<HRegionInfo>(size);
+      for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
+        this.regionsOnCrashedServer.add(HRegionInfo.convert(ri));
+      }
+    }
+    size = state.getRegionsToAssignCount();
+    if (size > 0) {
+      this.regionsToAssign = new ArrayList<HRegionInfo>(size);
+      for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
+        this.regionsToAssign.add(HRegionInfo.convert(ri));
+      }
+    }
+  }
+
+  /**
+   * Process a dead region from a dead RS. Checks if the region is disabled or
+   * disabling or if the region has a partially completed split.
+   * @param hri
+   * @param assignmentManager
+   * @return Returns true if specified region should be assigned, false if not.
+   * @throws IOException
+   */
+  private static boolean processDeadRegion(HRegionInfo hri, AssignmentManager assignmentManager)
+  throws IOException {
+    boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable());
+    if (!tablePresent) {
+      LOG.info("The table " + hri.getTable() + " was deleted.  Hence not proceeding.");
+      return false;
+    }
+    // If table is not disabled but the region is offlined,
+    boolean disabled = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
+      ZooKeeperProtos.Table.State.DISABLED);
+    if (disabled){
+      LOG.info("The table " + hri.getTable() + " was disabled.  Hence not proceeding.");
+      return false;
+    }
+    if (hri.isOffline() && hri.isSplit()) {
+      // HBASE-7721: Split parent and daughters are inserted into hbase:meta as an atomic operation.
+      // If the meta scanner saw the parent split, then it should see the daughters as assigned
+      // to the dead server. We don't have to do anything.
+      return false;
+    }
+    boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
+      ZooKeeperProtos.Table.State.DISABLING);
+    if (disabling) {
+      LOG.info("The table " + hri.getTable() + " is disabled.  Hence not assigning region" +
+        hri.getEncodedName());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * If hbase:meta is not assigned already, assign.
+   * @throws IOException
+   */
+  private void verifyAndAssignMetaWithRetries(final MasterProcedureEnv env) throws IOException {
+    MasterServices services = env.getMasterServices();
+    int iTimes = services.getConfiguration().getInt(KEY_RETRIES_ON_META, DEFAULT_RETRIES_ON_META);
+    // Just reuse same time as we have for short wait on meta. Adding another config is overkill.
+    long waitTime =
+      services.getConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META);
+    int iFlag = 0;
+    while (true) {
+      try {
+        verifyAndAssignMeta(env);
+        break;
+      } catch (KeeperException e) {
+        services.abort("In server shutdown processing, assigning meta", e);
+        throw new IOException("Aborting", e);
+      } catch (Exception e) {
+        if (iFlag >= iTimes) {
+          services.abort("verifyAndAssignMeta failed after" + iTimes + " retries, aborting", e);
+          throw new IOException("Aborting", e);
+        }
+        try {
+          Thread.sleep(waitTime);
+        } catch (InterruptedException e1) {
+          LOG.warn("Interrupted when is the thread sleep", e1);
+          Thread.currentThread().interrupt();
+          throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
+        }
+        iFlag++;
+      }
+    }
+  }
+
+  /**
+   * If hbase:meta is not assigned already, assign.
+   * @throws InterruptedException
+   * @throws IOException
+   * @throws KeeperException
+   */
+  private void verifyAndAssignMeta(final MasterProcedureEnv env)
+      throws InterruptedException, IOException, KeeperException {
+    MasterServices services = env.getMasterServices();
+    if (!isMetaAssignedQuickTest(env)) {
+      services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO);
+    } else if (serverName.equals(services.getMetaTableLocator().
+        getMetaRegionLocation(services.getZooKeeper()))) {
+      throw new IOException("hbase:meta is onlined on the dead server " + this.serverName);
+    } else {
+      LOG.info("Skip assigning hbase:meta because it is online at "
+          + services.getMetaTableLocator().getMetaRegionLocation(services.getZooKeeper()));
+    }
+  }
+
+  /**
+   * A quick test that hbase:meta is assigned; blocks for short time only.
+   * @return True if hbase:meta location is available and verified as good.
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  private boolean isMetaAssignedQuickTest(final MasterProcedureEnv env)
+  throws InterruptedException, IOException {
+    ZooKeeperWatcher zkw = env.getMasterServices().getZooKeeper();
+    MetaTableLocator mtl = env.getMasterServices().getMetaTableLocator();
+    boolean metaAssigned = false;
+    // Is hbase:meta location available yet?
+    if (mtl.isLocationAvailable(zkw)) {
+      ClusterConnection connection = env.getMasterServices().getConnection();
+      // Is hbase:meta location good yet?
+      long timeout =
+        env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META);
+      if (mtl.verifyMetaRegionLocation(connection, zkw, timeout)) {
+        metaAssigned = true;
+      }
+    }
+    return metaAssigned;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return this.serverName;
+  }
+
+  @Override
+  public boolean hasMetaTableRegion() {
+    return this.carryingMeta;
+  }
+
+  /**
+   * For this procedure, yield at end of each successful flow step so that all crashed servers
+   * can make progress rather than do the default which has each procedure running to completion
+   * before we move to the next. For crashed servers, especially if running with distributed log
+   * replay, we will want all servers to come along; we do not want the scenario where a server is
+   * stuck waiting for regions to online so it can replay edits.
+   */
+  @Override
+  protected boolean isYieldAfterSuccessfulFlowStateStep() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
new file mode 100644
index 0000000..5b0c45f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Procedures that handle servers -- e.g. server crash -- must implement this Interface.
+ * It is used by the procedure runner to figure locking and what queuing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface ServerProcedureInterface {
+  /**
+   * @return Name of this server instance.
+   */
+  ServerName getServerName();
+
+  /**
+   * @return True if this server has an hbase:meta table region.
+   */
+  boolean hasMetaTableRegion();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
index 6928d02..cc088f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
@@ -45,4 +45,4 @@ public interface TableProcedureInterface {
    * @return the operation type that the procedure is executing.
    */
   TableOperationType getTableOperationType();
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
index c69bd8f..0300c89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
@@ -183,12 +183,12 @@ public class TruncateTableProcedure
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
     if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "truncate table");
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table");
   }
 
   @Override
   protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseTableWrite(getTableName());
+    env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
   }
 
   @Override
@@ -287,4 +287,4 @@ public class TruncateTableProcedure
       });
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 22fdc78..94a193f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -351,7 +351,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private boolean disallowWritesInRecovering = false;
 
   // when a region is in recovering state, it can only accept writes not reads
-  private volatile boolean isRecovering = false;
+  private volatile boolean recovering = false;
 
   private volatile Optional<ConfigurationManager> configurationManager;
 
@@ -711,7 +711,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       Map<String, Region> recoveringRegions = rsServices.getRecoveringRegions();
       String encodedName = getRegionInfo().getEncodedName();
       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
-        this.isRecovering = true;
+        this.recovering = true;
         recoveringRegions.put(encodedName, this);
       }
     } else {
@@ -841,7 +841,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // overlaps used sequence numbers
     if (this.writestate.writesEnabled) {
       nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
-          .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1));
+          .getRegionDir(), nextSeqid, (this.recovering ? (this.flushPerChanges + 10000000) : 1));
     } else {
       nextSeqid++;
     }
@@ -1153,7 +1153,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * Reset recovering state of current region
    */
   public void setRecovering(boolean newState) {
-    boolean wasRecovering = this.isRecovering;
+    boolean wasRecovering = this.recovering;
     // before we flip the recovering switch (enabling reads) we should write the region open
     // event to WAL if needed
     if (wal != null && getRegionServerServices() != null && !writestate.readOnly
@@ -1194,8 +1194,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
 
-    this.isRecovering = newState;
-    if (wasRecovering && !isRecovering) {
+    this.recovering = newState;
+    if (wasRecovering && !recovering) {
       // Call only when wal replay is over.
       coprocessorHost.postLogReplay();
     }
@@ -1203,7 +1203,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public boolean isRecovering() {
-    return this.isRecovering;
+    return this.recovering;
   }
 
   @Override
@@ -6212,7 +6212,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.openSeqNum = initialize(reporter);
     this.setSequenceId(openSeqNum);
     if (wal != null && getRegionServerServices() != null && !writestate.readOnly
-        && !isRecovering) {
+        && !recovering) {
       // Only write the region open event marker to WAL if (1) we are not read-only
       // (2) dist log replay is off or we are not recovering. In case region is
       // recovering, the open event will be written at setRecovering(false)

http://git-wip-us.apache.org/repos/asf/hbase/blob/94f0ee7e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
index 0fffcc6..1360fb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
@@ -170,7 +170,7 @@ public class FSHDFSUtils extends FSUtils {
   boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
       final Configuration conf, final CancelableProgressable reporter)
   throws IOException {
-    LOG.info("Recovering lease on dfs file " + p);
+    LOG.info("Recover lease on dfs file " + p);
     long startWaiting = EnvironmentEdgeManager.currentTime();
     // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
     // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
@@ -259,7 +259,7 @@ public class FSHDFSUtils extends FSUtils {
     boolean recovered = false;
     try {
       recovered = dfs.recoverLease(p);
-      LOG.info("recoverLease=" + recovered + ", " +
+      LOG.info((recovered? "Recovered lease, ": "Failed to recover lease, ") +
         getLogMessageDetail(nbAttempt, p, startWaiting));
     } catch (IOException e) {
       if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {


Mime
View raw message