hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [8/9] hbase git commit: HBASE-14837 Procedure v2 - Procedure Queue Improvement
Date Thu, 14 Jan 2016 17:26:04 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
new file mode 100644
index 0000000..9a3714f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -0,0 +1,1241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
+
+/**
+ * ProcedureRunnableSet for the Master Procedures.
+ * This RunnableSet tries to provide to the ProcedureExecutor procedures
+ * that can be executed without having to wait on a lock.
+ * Most of the master operations can be executed concurrently, if they
+ * are operating on different tables (e.g. two create table can be performed
+ * at the same, time assuming table A and table B) or against two different servers; say
+ * two servers that crashed at about the same time.
+ *
+ * <p>Each procedure should implement an interface providing information for this queue.
+ * for example table related procedures should implement TableProcedureInterface.
+ * each procedure will be pushed in its own queue, and based on the operation type
+ * we may take smarter decision. e.g. we can abort all the operations preceding
+ * a delete table, or similar.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MasterProcedureScheduler implements ProcedureRunnableSet {
+  private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class);
+
+  private final TableLockManager lockManager;
+  private final ReentrantLock schedLock = new ReentrantLock();
+  private final Condition schedWaitCond = schedLock.newCondition();
+
+  private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
+  private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
+  private int queueSize = 0;
+
+  private final Object[] serverBuckets = new Object[128];
+  private Queue<String> namespaceMap = null;
+  private Queue<TableName> tableMap = null;
+
+  private final int metaTablePriority;
+  private final int userTablePriority;
+  private final int sysTablePriority;
+
+  // TODO: metrics
+  private long pollCalls = 0;
+  private long nullPollCalls = 0;
+
+  public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) {
+    this.lockManager = lockManager;
+
+    // TODO: should this be part of the HTD?
+    metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
+    sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
+    userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
+  }
+
+  @Override
+  public void addFront(Procedure proc) {
+    doAdd(proc, true);
+  }
+
+  @Override
+  public void addBack(Procedure proc) {
+    doAdd(proc, false);
+  }
+
+  @Override
+  public void yield(final Procedure proc) {
+    doAdd(proc, isTableProcedure(proc));
+  }
+
+  private void doAdd(final Procedure proc, final boolean addFront) {
+    schedLock.lock();
+    try {
+      if (isTableProcedure(proc)) {
+        doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
+      } else if (isServerProcedure(proc)) {
+        doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
+      } else {
+        // TODO: at the moment we only have Table and Server procedures
+        // if you are implementing a non-table/non-server procedure, you have two options: create
+        // a group for all the non-table/non-server procedures or try to find a key for your
+        // non-table/non-server procedures and implement something similar to the TableRunQueue.
+        throw new UnsupportedOperationException(
+          "RQs for non-table/non-server procedures are not implemented yet");
+      }
+      schedWaitCond.signal();
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
+      final Queue<T> queue, final Procedure proc, final boolean addFront) {
+    queue.add(proc, addFront);
+    if (!(queue.isSuspended() || queue.hasExclusiveLock())) {
+      if (queue.size() == 1 && !IterableList.isLinked(queue)) {
+        fairq.add(queue);
+      }
+      queueSize++;
+    }
+  }
+
+  @Override
+  public Procedure poll() {
+    return poll(-1);
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+  Procedure poll(long waitNsec) {
+    Procedure pollResult = null;
+    schedLock.lock();
+    try {
+      if (queueSize == 0) {
+        if (waitNsec < 0) {
+          schedWaitCond.await();
+        } else {
+          schedWaitCond.awaitNanos(waitNsec);
+        }
+        if (queueSize == 0) {
+          return null;
+        }
+      }
+
+      // For now, let server handling have precedence over table handling; presumption is that it
+      // is more important handling crashed servers than it is running the
+      // enabling/disabling tables, etc.
+      pollResult = doPoll(serverRunQueue);
+      if (pollResult == null) {
+        pollResult = doPoll(tableRunQueue);
+      }
+
+      // update metrics
+      pollCalls++;
+      nullPollCalls += (pollResult == null) ? 1 : 0;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      schedLock.unlock();
+    }
+    return pollResult;
+  }
+
+  private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
+    Queue<T> rq = fairq.poll();
+    if (rq == null || !rq.isAvailable()) {
+      return null;
+    }
+
+    assert !rq.isSuspended() : "rq=" + rq + " is suspended";
+    Procedure pollResult = rq.poll();
+    this.queueSize--;
+    if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) {
+      removeFromRunQueue(fairq, rq);
+    }
+    return pollResult;
+  }
+
+  @Override
+  public void clear() {
+    // NOTE: USED ONLY FOR TESTING
+    schedLock.lock();
+    try {
+      // Remove Servers
+      for (int i = 0; i < serverBuckets.length; ++i) {
+        clear((ServerQueue)serverBuckets[i], serverRunQueue);
+        serverBuckets[i] = null;
+      }
+
+      // Remove Tables
+      clear(tableMap, tableRunQueue);
+      tableMap = null;
+
+      assert queueSize == 0 : "expected queue size to be 0, got " + queueSize;
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private <T extends Comparable<T>> void clear(Queue<T> treeMap, FairQueue<T> fairq) {
+    while (treeMap != null) {
+      Queue<T> node = AvlTree.getFirst(treeMap);
+      assert !node.isSuspended() : "can't clear suspended " + node.getKey();
+      treeMap = AvlTree.remove(treeMap, node.getKey());
+      removeFromRunQueue(fairq, node);
+    }
+  }
+
+  @Override
+  public void signalAll() {
+    schedLock.lock();
+    try {
+      schedWaitCond.signalAll();
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  @Override
+  public int size() {
+    schedLock.lock();
+    try {
+      return queueSize;
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  @Override
+  public void completionCleanup(Procedure proc) {
+    if (proc instanceof TableProcedureInterface) {
+      TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
+      boolean tableDeleted;
+      if (proc.hasException()) {
+        IOException procEx =  proc.getException().unwrapRemoteException();
+        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
+          // create failed because the table already exist
+          tableDeleted = !(procEx instanceof TableExistsException);
+        } else {
+          // the operation failed because the table does not exist
+          tableDeleted = (procEx instanceof TableNotFoundException);
+        }
+      } else {
+        // the table was deleted
+        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
+      }
+      if (tableDeleted) {
+        markTableAsDeleted(iProcTable.getTableName());
+        return;
+      }
+    } else {
+      // No cleanup for ServerProcedureInterface types, yet.
+      return;
+    }
+  }
+
+  private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
+    if (IterableList.isLinked(queue)) return;
+    if (!queue.isEmpty())  {
+      fairq.add(queue);
+      queueSize += queue.size();
+    }
+  }
+
+  private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
+    if (!IterableList.isLinked(queue)) return;
+    fairq.remove(queue);
+    queueSize -= queue.size();
+  }
+
+  // ============================================================================
+  //  TODO: Metrics
+  // ============================================================================
+  public long getPollCalls() {
+    return pollCalls;
+  }
+
+  public long getNullPollCalls() {
+    return nullPollCalls;
+  }
+
+  // ============================================================================
+  //  Event Helpers
+  // ============================================================================
+  public boolean waitEvent(ProcedureEvent event, Procedure procedure) {
+    return waitEvent(event, procedure, false);
+  }
+
+  public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
+    synchronized (event) {
+      if (event.isReady()) {
+        return false;
+      }
+
+      // TODO: Suspend single procedure not implemented yet, fallback to suspending the queue
+      if (!suspendQueue) suspendQueue = true;
+
+      if (isTableProcedure(procedure)) {
+        suspendTableQueue(event, getTableName(procedure));
+      } else if (isServerProcedure(procedure)) {
+        suspendServerQueue(event, getServerName(procedure));
+      } else {
+        // TODO: at the moment we only have Table and Server procedures
+        // if you are implementing a non-table/non-server procedure, you have two options: create
+        // a group for all the non-table/non-server procedures or try to find a key for your
+        // non-table/non-server procedures and implement something similar to the TableRunQueue.
+        throw new UnsupportedOperationException(
+          "RQs for non-table/non-server procedures are not implemented yet");
+      }
+    }
+    return true;
+  }
+
+  private void suspendTableQueue(ProcedureEvent event, TableName tableName) {
+    schedLock.lock();
+    try {
+      TableQueue queue = getTableQueue(tableName);
+      if (!queue.setSuspended(true)) return;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Suspend table queue " + tableName);
+      }
+      removeFromRunQueue(tableRunQueue, queue);
+      event.suspendTableQueue(queue);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private void suspendServerQueue(ProcedureEvent event, ServerName serverName) {
+    schedLock.lock();
+    try {
+      // TODO: This will change once we have the new AM
+      ServerQueue queue = getServerQueue(serverName);
+      if (!queue.setSuspended(true)) return;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Suspend server queue " + serverName);
+      }
+      removeFromRunQueue(serverRunQueue, queue);
+      event.suspendServerQueue(queue);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  public void suspend(ProcedureEvent event) {
+    synchronized (event) {
+      event.setReady(false);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Suspend event " + event);
+      }
+    }
+  }
+
+  public void wake(ProcedureEvent event) {
+    synchronized (event) {
+      event.setReady(true);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Wake event " + event);
+      }
+
+      schedLock.lock();
+      try {
+        while (event.hasWaitingTables()) {
+          Queue<TableName> queue = event.popWaitingTable();
+          addToRunQueue(tableRunQueue, queue);
+        }
+        // TODO: This will change once we have the new AM
+        while (event.hasWaitingServers()) {
+          Queue<ServerName> queue = event.popWaitingServer();
+          addToRunQueue(serverRunQueue, queue);
+        }
+
+        if (queueSize > 1) {
+          schedWaitCond.signalAll();
+        } else if (queueSize > 0) {
+          schedWaitCond.signal();
+        }
+      } finally {
+        schedLock.unlock();
+      }
+    }
+  }
+
+  public static class ProcedureEvent {
+    private final String description;
+
+    private Queue<ServerName> waitingServers = null;
+    private Queue<TableName> waitingTables = null;
+    private boolean ready = false;
+
+    public ProcedureEvent(String description) {
+      this.description = description;
+    }
+
+    public synchronized boolean isReady() {
+      return ready;
+    }
+
+    private synchronized void setReady(boolean isReady) {
+      this.ready = isReady;
+    }
+
+    private void suspendTableQueue(Queue<TableName> queue) {
+      waitingTables = IterableList.append(waitingTables, queue);
+    }
+
+    private void suspendServerQueue(Queue<ServerName> queue) {
+      waitingServers = IterableList.append(waitingServers, queue);
+    }
+
+    private boolean hasWaitingTables() {
+      return waitingTables != null;
+    }
+
+    private Queue<TableName> popWaitingTable() {
+      Queue<TableName> node = waitingTables;
+      waitingTables = IterableList.remove(waitingTables, node);
+      node.setSuspended(false);
+      return node;
+    }
+
+    private boolean hasWaitingServers() {
+      return waitingServers != null;
+    }
+
+    private Queue<ServerName> popWaitingServer() {
+      Queue<ServerName> node = waitingServers;
+      waitingServers = IterableList.remove(waitingServers, node);
+      node.setSuspended(false);
+      return node;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("ProcedureEvent(%s)", description);
+    }
+  }
+
+  // ============================================================================
+  //  Table Queue Lookup Helpers
+  // ============================================================================
+  private TableQueue getTableQueueWithLock(TableName tableName) {
+    schedLock.lock();
+    try {
+      return getTableQueue(tableName);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private TableQueue getTableQueue(TableName tableName) {
+    Queue<TableName> node = AvlTree.get(tableMap, tableName);
+    if (node != null) return (TableQueue)node;
+
+    node = new TableQueue(tableName, getTablePriority(tableName));
+    tableMap = AvlTree.insert(tableMap, node);
+    return (TableQueue)node;
+  }
+
+  private void removeTableQueue(TableName tableName) {
+    tableMap = AvlTree.remove(tableMap, tableName);
+  }
+
+  private int getTablePriority(TableName tableName) {
+    if (tableName.equals(TableName.META_TABLE_NAME)) {
+      return metaTablePriority;
+    } else if (tableName.isSystemTable()) {
+      return sysTablePriority;
+    }
+    return userTablePriority;
+  }
+
+  private static boolean isTableProcedure(Procedure proc) {
+    return proc instanceof TableProcedureInterface;
+  }
+
+  private static TableName getTableName(Procedure proc) {
+    return ((TableProcedureInterface)proc).getTableName();
+  }
+
+  // ============================================================================
+  //  Server Queue Lookup Helpers
+  // ============================================================================
+  private ServerQueue getServerQueueWithLock(ServerName serverName) {
+    schedLock.lock();
+    try {
+      return getServerQueue(serverName);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private ServerQueue getServerQueue(ServerName serverName) {
+    int index = getBucketIndex(serverBuckets, serverName.hashCode());
+    Queue<ServerName> root = getTreeRoot(serverBuckets, index);
+    Queue<ServerName> node = AvlTree.get(root, serverName);
+    if (node != null) return (ServerQueue)node;
+
+    node = new ServerQueue(serverName);
+    serverBuckets[index] = AvlTree.insert(root, node);
+    return (ServerQueue)node;
+  }
+
+  private void removeServerQueue(ServerName serverName) {
+    int index = getBucketIndex(serverBuckets, serverName.hashCode());
+    serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T extends Comparable<T>> Queue<T> getTreeRoot(Object[] buckets, int index) {
+    return (Queue<T>) buckets[index];
+  }
+
+  private static int getBucketIndex(Object[] buckets, int hashCode) {
+    return Math.abs(hashCode) % buckets.length;
+  }
+
+  private static boolean isServerProcedure(Procedure proc) {
+    return proc instanceof ServerProcedureInterface;
+  }
+
+  private static ServerName getServerName(Procedure proc) {
+    return ((ServerProcedureInterface)proc).getServerName();
+  }
+
+  // ============================================================================
+  //  Table and Server Queue Implementation
+  // ============================================================================
+  public static class ServerQueue extends QueueImpl<ServerName> {
+    public ServerQueue(ServerName serverName) {
+      super(serverName);
+    }
+
+    public boolean requireExclusiveLock(Procedure proc) {
+      ServerProcedureInterface spi = (ServerProcedureInterface)proc;
+      switch (spi.getServerOperationType()) {
+        case CRASH_HANDLER:
+          return true;
+        default:
+          break;
+      }
+      throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType());
+    }
+  }
+
+  public static class TableQueue extends QueueImpl<TableName> {
+    private TableLock tableLock = null;
+
+    public TableQueue(TableName tableName, int priority) {
+      super(tableName, priority);
+    }
+
+    // TODO: We can abort pending/in-progress operation if the new call is
+    //       something like drop table. We can Override addBack(),
+    //       check the type and abort all the in-flight procedurs.
+    private boolean canAbortPendingOperations(Procedure proc) {
+      TableProcedureInterface tpi = (TableProcedureInterface)proc;
+      switch (tpi.getTableOperationType()) {
+        case DELETE:
+          return true;
+        default:
+          return false;
+      }
+    }
+
+    public boolean requireExclusiveLock(Procedure proc) {
+      TableProcedureInterface tpi = (TableProcedureInterface)proc;
+      switch (tpi.getTableOperationType()) {
+        case CREATE:
+        case DELETE:
+        case DISABLE:
+        case EDIT:
+        case ENABLE:
+          return true;
+        case READ:
+          return false;
+        default:
+          break;
+      }
+      throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType());
+    }
+
+    private synchronized boolean trySharedLock(final TableLockManager lockManager,
+        final String purpose) {
+      if (hasExclusiveLock()) return false;
+
+      // Take zk-read-lock
+      TableName tableName = getKey();
+      tableLock = lockManager.readLock(tableName, purpose);
+      try {
+        tableLock.acquire();
+      } catch (IOException e) {
+        LOG.error("failed acquire read lock on " + tableName, e);
+        tableLock = null;
+        return false;
+      }
+
+      trySharedLock();
+      return true;
+    }
+
+    private synchronized void releaseSharedLock(final TableLockManager lockManager) {
+      releaseTableLock(lockManager, isSingleSharedLock());
+      releaseSharedLock();
+    }
+
+    private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager,
+        final String purpose) {
+      // Take zk-write-lock
+      TableName tableName = getKey();
+      tableLock = lockManager.writeLock(tableName, purpose);
+      try {
+        tableLock.acquire();
+      } catch (IOException e) {
+        LOG.error("failed acquire write lock on " + tableName, e);
+        tableLock = null;
+        return false;
+      }
+      return true;
+    }
+
+    private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) {
+      releaseTableLock(lockManager, true);
+    }
+
+    private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
+      for (int i = 0; i < 3; ++i) {
+        try {
+          tableLock.release();
+          if (reset) {
+            tableLock = null;
+          }
+          break;
+        } catch (IOException e) {
+          LOG.warn("Could not release the table write-lock", e);
+        }
+      }
+    }
+  }
+
+  // ============================================================================
+  //  Locking Helpers
+  // ============================================================================
+  /**
+   * Try to acquire the exclusive lock on the specified table.
+   * other operations in the table-queue will be executed after the lock is released.
+   * @param table Table to lock
+   * @param purpose Human readable reason for locking the table
+   * @return true if we were able to acquire the lock on the table, otherwise false.
+   */
+  public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
+    schedLock.lock();
+    TableQueue queue = getTableQueue(table);
+    boolean hasXLock = queue.tryExclusiveLock();
+    if (!hasXLock) {
+      schedLock.unlock();
+      return false;
+    }
+
+    removeFromRunQueue(tableRunQueue, queue);
+    schedLock.unlock();
+
+    // Zk lock is expensive...
+    hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
+    if (!hasXLock) {
+      schedLock.lock();
+      queue.releaseExclusiveLock();
+      addToRunQueue(tableRunQueue, queue);
+      schedLock.unlock();
+    }
+    return hasXLock;
+  }
+
+  /**
+   * Release the exclusive lock taken with tryAcquireTableWrite()
+   * @param table the name of the table that has the exclusive lock
+   */
+  public void releaseTableExclusiveLock(final TableName table) {
+    schedLock.lock();
+    TableQueue queue = getTableQueue(table);
+    schedLock.unlock();
+
+    // Zk lock is expensive...
+    queue.releaseZkExclusiveLock(lockManager);
+
+    schedLock.lock();
+    queue.releaseExclusiveLock();
+    addToRunQueue(tableRunQueue, queue);
+    schedLock.unlock();
+  }
+
+  /**
+   * Try to acquire the shared lock on the specified table.
+   * other "read" operations in the table-queue may be executed concurrently,
+   * @param table Table to lock
+   * @param purpose Human readable reason for locking the table
+   * @return true if we were able to acquire the lock on the table, otherwise false.
+   */
+  public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
+    return getTableQueueWithLock(table).trySharedLock(lockManager, purpose);
+  }
+
+  /**
+   * Release the shared lock taken with tryAcquireTableRead()
+   * @param table the name of the table that has the shared lock
+   */
+  public void releaseTableSharedLock(final TableName table) {
+    getTableQueueWithLock(table).releaseSharedLock(lockManager);
+  }
+
+  /**
+   * Tries to remove the queue and the table-lock of the specified table.
+   * If there are new operations pending (e.g. a new create),
+   * the remove will not be performed.
+   * @param table the name of the table that should be marked as deleted
+   * @return true if deletion succeeded, false otherwise meaning that there are
+   *     other new operations pending for that table (e.g. a new create).
+   */
+  protected boolean markTableAsDeleted(final TableName table) {
+    final ReentrantLock l = schedLock;
+    l.lock();
+    try {
+      TableQueue queue = getTableQueue(table);
+      if (queue == null) return true;
+
+      if (queue.isEmpty() && queue.acquireDeleteLock()) {
+        // remove the table from the run-queue and the map
+        if (IterableList.isLinked(queue)) {
+          tableRunQueue.remove(queue);
+        }
+
+        // Remove the table lock
+        try {
+          lockManager.tableDeleted(table);
+        } catch (IOException e) {
+          LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical
+        }
+
+        removeTableQueue(table);
+      } else {
+        // TODO: If there are no create, we can drop all the other ops
+        return false;
+      }
+    } finally {
+      l.unlock();
+    }
+    return true;
+  }
+
+  // ============================================================================
+  //  Server Locking Helpers
+  // ============================================================================
+  /**
+   * Release the exclusive lock
+   * @see #tryAcquireServerExclusiveLock(ServerName)
+   * @param serverName the server that has the exclusive lock
+   */
+  public boolean tryAcquireServerExclusiveLock(final ServerName serverName) {
+    schedLock.lock();
+    try {
+      ServerQueue queue = getServerQueue(serverName);
+      if (queue.tryExclusiveLock()) {
+        removeFromRunQueue(serverRunQueue, queue);
+        return true;
+      }
+    } finally {
+      schedLock.unlock();
+    }
+    return false;
+  }
+
+  /**
+   * Release the exclusive lock
+   * @see #tryAcquireServerExclusiveLock(ServerName)
+   * @param serverName the server that has the exclusive lock
+   */
+  public void releaseServerExclusiveLock(final ServerName serverName) {
+    schedLock.lock();
+    try {
+      ServerQueue queue = getServerQueue(serverName);
+      queue.releaseExclusiveLock();
+      addToRunQueue(serverRunQueue, queue);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  /**
+   * Try to acquire the shared lock on the specified server.
+   * @see #releaseServerSharedLock(ServerName)
+   * @param serverName Server to lock
+   * @return true if we were able to acquire the lock on the server, otherwise false.
+   */
+  public boolean tryAcquireServerSharedLock(final ServerName serverName) {
+    return getServerQueueWithLock(serverName).trySharedLock();
+  }
+
+  /**
+   * Release the shared lock taken
+   * @see #tryAcquireServerSharedLock(ServerName)
+   * @param serverName the server that has the shared lock
+   */
+  public void releaseServerSharedLock(final ServerName serverName) {
+    getServerQueueWithLock(serverName).releaseSharedLock();
+  }
+
+  // ============================================================================
+  //  Generic Helpers
+  // ============================================================================
+  private static interface QueueInterface {
+    boolean isAvailable();
+    boolean isEmpty();
+    int size();
+    void add(Procedure proc, boolean addFront);
+    boolean requireExclusiveLock(Procedure proc);
+    Procedure poll();
+
+    boolean isSuspended();
+  }
+
+  private static abstract class Queue<TKey extends Comparable<TKey>> implements QueueInterface {
+    private Queue<TKey> avlRight = null;
+    private Queue<TKey> avlLeft = null;
+    private int avlHeight = 1;
+
+    private Queue<TKey> iterNext = null;
+    private Queue<TKey> iterPrev = null;
+    private boolean suspended = false;
+
+    private boolean exclusiveLock = false;
+    private int sharedLock = 0;
+
+    private final TKey key;
+    private final int priority;
+
+    public Queue(TKey key) {
+      this(key, 1);
+    }
+
+    public Queue(TKey key, int priority) {
+      this.key = key;
+      this.priority = priority;
+    }
+
+    protected TKey getKey() {
+      return key;
+    }
+
+    protected int getPriority() {
+      return priority;
+    }
+
+    /**
+     * True if the queue is not in the run-queue and it is owned by an event.
+     */
+    public boolean isSuspended() {
+      return suspended;
+    }
+
+    protected boolean setSuspended(boolean isSuspended) {
+      if (this.suspended == isSuspended) return false;
+      this.suspended = isSuspended;
+      return true;
+    }
+
+    // ======================================================================
+    //  Read/Write Locking helpers
+    // ======================================================================
+    public synchronized boolean isLocked() {
+      return hasExclusiveLock() || sharedLock > 0;
+    }
+
+    public synchronized boolean hasExclusiveLock() {
+      return this.exclusiveLock;
+    }
+
+    public synchronized boolean trySharedLock() {
+      if (hasExclusiveLock()) return false;
+      sharedLock++;
+      return true;
+    }
+
+    public synchronized void releaseSharedLock() {
+      sharedLock--;
+    }
+
+    protected synchronized boolean isSingleSharedLock() {
+      return sharedLock == 1;
+    }
+
+    public synchronized boolean tryExclusiveLock() {
+      if (isLocked()) return false;
+      exclusiveLock = true;
+      return true;
+    }
+
+    public synchronized void releaseExclusiveLock() {
+      exclusiveLock = false;
+    }
+
+    public synchronized boolean acquireDeleteLock() {
+      return tryExclusiveLock();
+    }
+
+    // This should go away when we have the new AM and its events
+    // and we move xlock to the lock-event-queue.
+    public synchronized boolean isAvailable() {
+      return !exclusiveLock && !isEmpty();
+    }
+
+    // ======================================================================
+    //  Generic Helpers
+    // ======================================================================
+    public int compareKey(TKey cmpKey) {
+      return key.compareTo(cmpKey);
+    }
+
+    public int compareTo(Queue<TKey> other) {
+      return compareKey(other.key);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s(%s)", getClass().getSimpleName(), key);
+    }
+  }
+
+  // ======================================================================
+  //  Helper Data Structures
+  // ======================================================================
+  private static abstract class QueueImpl<TKey extends Comparable<TKey>> extends Queue<TKey> {
+    private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>();
+
+    public QueueImpl(TKey key) {
+      super(key);
+    }
+
+    public QueueImpl(TKey key, int priority) {
+      super(key, priority);
+    }
+
+    public void add(final Procedure proc, final boolean addToFront) {
+      if (addToFront) {
+        addFront(proc);
+      } else {
+        addBack(proc);
+      }
+    }
+
+    protected void addFront(final Procedure proc) {
+      runnables.addFirst(proc);
+    }
+
+    protected void addBack(final Procedure proc) {
+      runnables.addLast(proc);
+    }
+
+    @Override
+    public Procedure poll() {
+      return runnables.poll();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return runnables.isEmpty();
+    }
+
+    public int size() {
+      return runnables.size();
+    }
+  }
+
+  private static class FairQueue<T extends Comparable<T>> {
+    private final int quantum;
+
+    private Queue<T> currentQueue = null;
+    private Queue<T> queueHead = null;
+    private int currentQuantum = 0;
+
+    public FairQueue() {
+      this(1);
+    }
+
+    public FairQueue(int quantum) {
+      this.quantum = quantum;
+    }
+
+    public void add(Queue<T> queue) {
+      queueHead = IterableList.append(queueHead, queue);
+      if (currentQueue == null) setNextQueue(queueHead);
+    }
+
+    public void remove(Queue<T> queue) {
+      Queue<T> nextQueue = queue.iterNext;
+      queueHead = IterableList.remove(queueHead, queue);
+      if (currentQueue == queue) {
+        setNextQueue(queueHead != null ? nextQueue : null);
+      }
+    }
+
+    public Queue<T> poll() {
+      if (currentQuantum == 0) {
+        if (!nextQueue()) {
+          return null; // nothing here
+        }
+        currentQuantum = calculateQuantum(currentQueue) - 1;
+      } else {
+        currentQuantum--;
+      }
+
+      // This should go away when we have the new AM and its events
+      if (!currentQueue.isAvailable()) {
+        Queue<T> lastQueue = currentQueue;
+        do {
+          if (!nextQueue())
+            return null;
+        } while (currentQueue != lastQueue && !currentQueue.isAvailable());
+
+        currentQuantum = calculateQuantum(currentQueue) - 1;
+      }
+      return currentQueue;
+    }
+
+    private boolean nextQueue() {
+      if (currentQueue == null) return false;
+      currentQueue = currentQueue.iterNext;
+      return currentQueue != null;
+    }
+
+    private void setNextQueue(Queue<T> queue) {
+      currentQueue = queue;
+      if (queue != null) {
+        currentQuantum = calculateQuantum(currentQueue);
+      } else {
+        currentQuantum = 0;
+      }
+    }
+
+    private int calculateQuantum(final Queue queue) {
+      return Math.max(1, queue.getPriority() * quantum); // TODO
+    }
+  }
+
+  private static class AvlTree {
+    public static <T extends Comparable<T>> Queue<T> get(Queue<T> root, T key) {
+      while (root != null) {
+        int cmp = root.compareKey(key);
+        if (cmp > 0) {
+          root = root.avlLeft;
+        } else if (cmp < 0) {
+          root = root.avlRight;
+        } else {
+          return root;
+        }
+      }
+      return null;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> getFirst(Queue<T> root) {
+      if (root != null) {
+        while (root.avlLeft != null) {
+          root = root.avlLeft;
+        }
+      }
+      return root;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> getLast(Queue<T> root) {
+      if (root != null) {
+        while (root.avlRight != null) {
+          root = root.avlRight;
+        }
+      }
+      return root;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> insert(Queue<T> root, Queue<T> node) {
+      if (root == null) return node;
+      if (node.compareTo(root) < 0) {
+        root.avlLeft = insert(root.avlLeft, node);
+      } else {
+        root.avlRight = insert(root.avlRight, node);
+      }
+      return balance(root);
+    }
+
+    private static <T extends Comparable<T>> Queue<T> removeMin(Queue<T> p) {
+      if (p.avlLeft == null)
+        return p.avlRight;
+      p.avlLeft = removeMin(p.avlLeft);
+      return balance(p);
+    }
+
+    public static <T extends Comparable<T>> Queue<T> remove(Queue<T> root, T key) {
+      if (root == null) return null;
+
+      int cmp = root.compareKey(key);
+      if (cmp == 0) {
+        Queue<T> q = root.avlLeft;
+        Queue<T> r = root.avlRight;
+        if (r == null) return q;
+        Queue<T> min = getFirst(r);
+        min.avlRight = removeMin(r);
+        min.avlLeft = q;
+        return balance(min);
+      } else if (cmp > 0) {
+        root.avlLeft = remove(root.avlLeft, key);
+      } else /* if (cmp < 0) */ {
+        root.avlRight = remove(root.avlRight, key);
+      }
+      return balance(root);
+    }
+
+    private static <T extends Comparable<T>> Queue<T> balance(Queue<T> p) {
+      fixHeight(p);
+      int balance = balanceFactor(p);
+      if (balance == 2) {
+        if (balanceFactor(p.avlRight) < 0) {
+          p.avlRight = rotateRight(p.avlRight);
+        }
+        return rotateLeft(p);
+      } else if (balance == -2) {
+        if (balanceFactor(p.avlLeft) > 0) {
+          p.avlLeft = rotateLeft(p.avlLeft);
+        }
+        return rotateRight(p);
+      }
+      return p;
+    }
+
+    private static <T extends Comparable<T>> Queue<T> rotateRight(Queue<T> p) {
+      Queue<T> q = p.avlLeft;
+      p.avlLeft = q.avlRight;
+      q.avlRight = p;
+      fixHeight(p);
+      fixHeight(q);
+      return q;
+    }
+
+    private static <T extends Comparable<T>> Queue<T> rotateLeft(Queue<T> q) {
+      Queue<T> p = q.avlRight;
+      q.avlRight = p.avlLeft;
+      p.avlLeft = q;
+      fixHeight(q);
+      fixHeight(p);
+      return p;
+    }
+
+    private static <T extends Comparable<T>> void fixHeight(Queue<T> node) {
+      int heightLeft = height(node.avlLeft);
+      int heightRight = height(node.avlRight);
+      node.avlHeight = 1 + Math.max(heightLeft, heightRight);
+    }
+
+    private static <T extends Comparable<T>> int height(Queue<T> node) {
+      return node != null ? node.avlHeight : 0;
+    }
+
+    private static <T extends Comparable<T>> int balanceFactor(Queue<T> node) {
+      return height(node.avlRight) - height(node.avlLeft);
+    }
+  }
+
+  private static class IterableList {
+    public static <T extends Comparable<T>> Queue<T> prepend(Queue<T> head, Queue<T> node) {
+      assert !isLinked(node) : node + " is already linked";
+      if (head != null) {
+        Queue<T> tail = head.iterPrev;
+        tail.iterNext = node;
+        head.iterPrev = node;
+        node.iterNext = head;
+        node.iterPrev = tail;
+      } else {
+        node.iterNext = node;
+        node.iterPrev = node;
+      }
+      return node;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> append(Queue<T> head, Queue<T> node) {
+      assert !isLinked(node) : node + " is already linked";
+      if (head != null) {
+        Queue<T> tail = head.iterPrev;
+        tail.iterNext = node;
+        node.iterNext = head;
+        node.iterPrev = tail;
+        head.iterPrev = node;
+        return head;
+      }
+      node.iterNext = node;
+      node.iterPrev = node;
+      return node;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> appendList(Queue<T> head, Queue<T> otherHead) {
+      if (head == null) return otherHead;
+      if (otherHead == null) return head;
+
+      Queue<T> tail = head.iterPrev;
+      Queue<T> otherTail = otherHead.iterPrev;
+      tail.iterNext = otherHead;
+      otherHead.iterPrev = tail;
+      otherTail.iterNext = head;
+      head.iterPrev = otherTail;
+      return head;
+    }
+
+    private static <T extends Comparable<T>> Queue<T> remove(Queue<T> head, Queue<T> node) {
+      assert isLinked(node) : node + " is not linked";
+      if (node != node.iterNext) {
+        node.iterPrev.iterNext = node.iterNext;
+        node.iterNext.iterPrev = node.iterPrev;
+        head = (head == node) ? node.iterNext : head;
+      } else {
+        head = null;
+      }
+      node.iterNext = null;
+      node.iterPrev = null;
+      return head;
+    }
+
+    private static <T extends Comparable<T>> boolean isLinked(Queue<T> node) {
+      return node.iterPrev != null && node.iterNext != null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
index a6f97da..bd4f9e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -182,10 +181,8 @@ public class ModifyColumnFamilyProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_MODIFY_FAMILY.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "modify family");
   }
 
   @Override
@@ -379,4 +376,4 @@ public class ModifyColumnFamilyProcedure
       });
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 13a2496..329f717 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -215,10 +215,8 @@ public class ModifyTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      getTableName(),
-      EventType.C_M_MODIFY_TABLE.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "modify table");
   }
 
   @Override
@@ -508,4 +506,4 @@ public class ModifyTableProcedure
     }
     return regionInfoList;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 9e0b86e..cb8b637 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -588,13 +588,13 @@ implements ServerProcedureInterface {
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false;
-    return env.getProcedureQueue().tryAcquireServerExclusiveLock(this);
+    if (env.waitServerCrashProcessingEnabled(this)) return false;
+    return env.getProcedureQueue().tryAcquireServerExclusiveLock(getServerName());
   }
 
   @Override
   protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseServerExclusiveLock(this);
+    env.getProcedureQueue().releaseServerExclusiveLock(getServerName());
   }
 
   @Override
@@ -788,6 +788,11 @@ implements ServerProcedureInterface {
     return this.carryingMeta;
   }
 
+  @Override
+  public ServerOperationType getServerOperationType() {
+    return ServerOperationType.CRASH_HANDLER;
+  }
+
   /**
    * For this procedure, yield at end of each successful flow step so that all crashed servers
    * can make progress rather than do the default which has each procedure running to completion

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
index 5b0c45f..b5c24ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
@@ -28,6 +28,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface ServerProcedureInterface {
+  public enum ServerOperationType {
+    CRASH_HANDLER
+  };
+
   /**
    * @return Name of this server instance.
    */
@@ -37,4 +41,12 @@ public interface ServerProcedureInterface {
    * @return True if this server has an hbase:meta table region.
    */
   boolean hasMetaTableRegion();
-}
\ No newline at end of file
+
+  /**
+   * Given an operation type we can take decisions about what to do with pending operations.
+   * e.g. if we get a crash handler and we have some assignment operation pending
+   * we can abort those operations.
+   * @return the operation type that the procedure is executing.
+   */
+  ServerOperationType getServerOperationType();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
index 3c1e593..da220f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
@@ -181,7 +181,7 @@ public class TruncateTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
+    if (env.waitInitialized(this)) return false;
     return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
index 70cb2fc..34715aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
@@ -129,14 +129,14 @@ public class TestMaster {
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
     HMaster m = cluster.getMaster();
     try {
-      m.initialized = false; // fake it, set back later
+      m.setInitialized(false); // fake it, set back later
       HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO;
       m.move(meta.getEncodedNameAsBytes(), null);
       fail("Region should not be moved since master is not initialized");
     } catch (IOException ioe) {
       assertTrue(ioe instanceof PleaseHoldException);
     } finally {
-      m.initialized = true;
+      m.setInitialized(true);
     }
   }
 
@@ -173,13 +173,13 @@ public class TestMaster {
     try {
       List<HRegionInfo> tableRegions = admin.getTableRegions(tableName);
 
-      master.initialized = false; // fake it, set back later
+      master.setInitialized(false); // fake it, set back later
       admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null);
       fail("Region should not be moved since master is not initialized");
     } catch (IOException ioe) {
       assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException"));
     } finally {
-      master.initialized = true;
+      master.setInitialized(true);
       TEST_UTIL.deleteTable(tableName);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
index fe93bb5..e27b3a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -308,7 +308,7 @@ public class TestMasterNoCluster {
 
     try {
       // Wait till master is initialized.
-      while (!master.initialized) Threads.sleep(10);
+      while (!master.isInitialized()) Threads.sleep(10);
       LOG.info("Master is initialized");
 
       assertFalse("The dead server should not be pulled in",

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
new file mode 100644
index 0000000..0027c2f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, MediumTests.class})
+public class TestMasterProcedureEvents {
+  private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class);
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static long nonceGroup = HConstants.NO_NONCE;
+  private static long nonce = HConstants.NO_NONCE;
+
+  private static void setupConf(Configuration conf) {
+    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8);
+    conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test
+  public void testMasterInitializedEvent() throws Exception {
+    TableName tableName = TableName.valueOf("testMasterInitializedEvent");
+    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
+    MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
+
+    HRegionInfo hri = new HRegionInfo(tableName);
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor("f");
+    htd.addFamily(hcd);
+
+    while (!master.isInitialized()) Thread.sleep(250);
+    master.setInitialized(false); // fake it, set back later
+
+    CreateTableProcedure proc = new CreateTableProcedure(
+      procExec.getEnvironment(), htd, new HRegionInfo[] { hri });
+
+    long pollCalls = procSched.getPollCalls();
+    long nullPollCalls = procSched.getNullPollCalls();
+
+    long procId = procExec.submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    for (int i = 0; i < 10; ++i) {
+      Thread.sleep(100);
+      assertEquals(pollCalls + 1, procSched.getPollCalls());
+      assertEquals(nullPollCalls, procSched.getNullPollCalls());
+    }
+
+    master.setInitialized(true);
+    ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+    assertEquals(pollCalls + 2, procSched.getPollCalls());
+    assertEquals(nullPollCalls, procSched.getNullPollCalls());
+  }
+
+  @Test
+  public void testServerCrashProcedureEvent() throws Exception {
+    TableName tableName = TableName.valueOf("testServerCrashProcedureEventTb");
+    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
+    MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
+
+    while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() ||
+        master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+      Thread.sleep(25);
+    }
+
+    UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]);
+    try (Table t = UTIL.getConnection().getTable(tableName)) {
+      // Load the table with a bit of data so some logs to split and some edits in each region.
+      UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
+    }
+
+    master.setServerCrashProcessingEnabled(false);  // fake it, set back later
+
+    long pollCalls = procSched.getPollCalls();
+    long nullPollCalls = procSched.getNullPollCalls();
+
+    // Kill a server. Master will notice but do nothing other than add it to list of dead servers.
+    HRegionServer hrs = getServerWithRegions();
+    boolean carryingMeta = master.getAssignmentManager()
+        .isCarryingMeta(hrs.getServerName()) == AssignmentManager.ServerHostRegion.HOSTING_REGION;
+    UTIL.getHBaseCluster().killRegionServer(hrs.getServerName());
+    hrs.join();
+
+    // Wait until the expiration of the server has arrived at the master. We won't process it
+    // by queuing a ServerCrashProcedure because we have disabled crash processing... but wait
+    // here so ServerManager gets notice and adds expired server to appropriate queues.
+    while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10);
+
+    // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
+    master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName());
+
+    long procId = procExec.submitProcedure(
+      new ServerCrashProcedure(hrs.getServerName(), true, carryingMeta));
+
+    for (int i = 0; i < 10; ++i) {
+      Thread.sleep(100);
+      assertEquals(pollCalls + 1, procSched.getPollCalls());
+      assertEquals(nullPollCalls, procSched.getNullPollCalls());
+    }
+
+    // Now, reenable processing else we can't get a lock on the ServerCrashProcedure.
+    master.setServerCrashProcessingEnabled(true);
+    ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+    LOG.debug("server crash processing poll calls: " + procSched.getPollCalls());
+    assertTrue(procSched.getPollCalls() >= (pollCalls + 2));
+    assertEquals(nullPollCalls, procSched.getNullPollCalls());
+
+    UTIL.deleteTable(tableName);
+  }
+
+  private HRegionServer getServerWithRegions() {
+    for (int i = 0; i < 3; ++i) {
+      HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i);
+      if (hrs.getNumberOfOnlineRegions() > 0) {
+        return hrs;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
deleted file mode 100644
index 68384ce..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.master.TableLockManager;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(SmallTests.class)
-public class TestMasterProcedureQueue {
-  private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class);
-
-  private MasterProcedureQueue queue;
-  private Configuration conf;
-
-  @Before
-  public void setUp() throws IOException {
-    conf = HBaseConfiguration.create();
-    queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager());
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    assertEquals(0, queue.size());
-  }
-
-  @Test
-  public void testConcurrentCreateDelete() throws Exception {
-    final MasterProcedureQueue procQueue = queue;
-    final TableName table = TableName.valueOf("testtb");
-    final AtomicBoolean running = new AtomicBoolean(true);
-    final AtomicBoolean failure = new AtomicBoolean(false);
-    Thread createThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          while (running.get() && !failure.get()) {
-            if (procQueue.tryAcquireTableExclusiveLock(table, "create")) {
-              procQueue.releaseTableExclusiveLock(table);
-            }
-          }
-        } catch (Throwable e) {
-          LOG.error("create failed", e);
-          failure.set(true);
-        }
-      }
-    };
-
-    Thread deleteThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          while (running.get() && !failure.get()) {
-            if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) {
-              procQueue.releaseTableExclusiveLock(table);
-            }
-            procQueue.markTableAsDeleted(table);
-          }
-        } catch (Throwable e) {
-          LOG.error("delete failed", e);
-          failure.set(true);
-        }
-      }
-    };
-
-    createThread.start();
-    deleteThread.start();
-    for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
-      Thread.sleep(100);
-    }
-    running.set(false);
-    createThread.join();
-    deleteThread.join();
-    assertEquals(false, failure.get());
-  }
-
-  /**
-   * Verify simple create/insert/fetch/delete of the table queue.
-   */
-  @Test
-  public void testSimpleTableOpsQueues() throws Exception {
-    final int NUM_TABLES = 10;
-    final int NUM_ITEMS = 10;
-
-    int count = 0;
-    for (int i = 1; i <= NUM_TABLES; ++i) {
-      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
-      // insert items
-      for (int j = 1; j <= NUM_ITEMS; ++j) {
-        queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-        assertEquals(++count, queue.size());
-      }
-    }
-    assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
-
-    for (int j = 1; j <= NUM_ITEMS; ++j) {
-      for (int i = 1; i <= NUM_TABLES; ++i) {
-        Long procId = queue.poll();
-        assertEquals(--count, queue.size());
-        assertEquals(i * 1000 + j, procId.longValue());
-      }
-    }
-    assertEquals(0, queue.size());
-
-    for (int i = 1; i <= NUM_TABLES; ++i) {
-      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
-      // complete the table deletion
-      assertTrue(queue.markTableAsDeleted(tableName));
-    }
-  }
-
-  /**
-   * Check that the table queue is not deletable until every procedure
-   * in-progress is completed (this is a special case for write-locks).
-   */
-  @Test
-  public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
-    TableName tableName = TableName.valueOf("testtb");
-
-    queue.addBack(new TestTableProcedure(1, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-
-    // table can't be deleted because one item is in the queue
-    assertFalse(queue.markTableAsDeleted(tableName));
-
-    // fetch item and take a lock
-    assertEquals(1, queue.poll().longValue());
-    // take the xlock
-    assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write"));
-    // table can't be deleted because we have the lock
-    assertEquals(0, queue.size());
-    assertFalse(queue.markTableAsDeleted(tableName));
-    // release the xlock
-    queue.releaseTableExclusiveLock(tableName);
-    // complete the table deletion
-    assertTrue(queue.markTableAsDeleted(tableName));
-  }
-
-  /**
-   * Check that the table queue is not deletable until every procedure
-   * in-progress is completed (this is a special case for read-locks).
-   */
-  @Test
-  public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
-    final TableName tableName = TableName.valueOf("testtb");
-    final int nitems = 2;
-
-    for (int i = 1; i <= nitems; ++i) {
-      queue.addBack(new TestTableProcedure(i, tableName,
-            TableProcedureInterface.TableOperationType.READ));
-    }
-
-    // table can't be deleted because one item is in the queue
-    assertFalse(queue.markTableAsDeleted(tableName));
-
-    for (int i = 1; i <= nitems; ++i) {
-      // fetch item and take a lock
-      assertEquals(i, queue.poll().longValue());
-      // take the rlock
-      assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i));
-      // table can't be deleted because we have locks and/or items in the queue
-      assertFalse(queue.markTableAsDeleted(tableName));
-    }
-
-    for (int i = 1; i <= nitems; ++i) {
-      // table can't be deleted because we have locks
-      assertFalse(queue.markTableAsDeleted(tableName));
-      // release the rlock
-      queue.releaseTableSharedLock(tableName);
-    }
-
-    // there are no items and no lock in the queeu
-    assertEquals(0, queue.size());
-    // complete the table deletion
-    assertTrue(queue.markTableAsDeleted(tableName));
-  }
-
-  /**
-   * Verify the correct logic of RWLocks on the queue
-   */
-  @Test
-  public void testVerifyRwLocks() throws Exception {
-    TableName tableName = TableName.valueOf("testtb");
-    queue.addBack(new TestTableProcedure(1, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-    queue.addBack(new TestTableProcedure(2, tableName,
-          TableProcedureInterface.TableOperationType.READ));
-    queue.addBack(new TestTableProcedure(3, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-    queue.addBack(new TestTableProcedure(4, tableName,
-          TableProcedureInterface.TableOperationType.READ));
-    queue.addBack(new TestTableProcedure(5, tableName,
-          TableProcedureInterface.TableOperationType.READ));
-
-    // Fetch the 1st item and take the write lock
-    Long procId = queue.poll();
-    assertEquals(1, procId.longValue());
-    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
-
-    // Fetch the 2nd item and verify that the lock can't be acquired
-    assertEquals(null, queue.poll());
-
-    // Release the write lock and acquire the read lock
-    queue.releaseTableExclusiveLock(tableName);
-
-    // Fetch the 2nd item and take the read lock
-    procId = queue.poll();
-    assertEquals(2, procId.longValue());
-    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
-
-    // Fetch the 3rd item and verify that the lock can't be acquired
-    procId = queue.poll();
-    assertEquals(3, procId.longValue());
-    assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
-
-    // release the rdlock of item 2 and take the wrlock for the 3d item
-    queue.releaseTableSharedLock(tableName);
-    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
-
-    // Fetch 4th item and verify that the lock can't be acquired
-    assertEquals(null, queue.poll());
-
-    // Release the write lock and acquire the read lock
-    queue.releaseTableExclusiveLock(tableName);
-
-    // Fetch the 4th item and take the read lock
-    procId = queue.poll();
-    assertEquals(4, procId.longValue());
-    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
-
-    // Fetch the 4th item and take the read lock
-    procId = queue.poll();
-    assertEquals(5, procId.longValue());
-    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
-
-    // Release 4th and 5th read-lock
-    queue.releaseTableSharedLock(tableName);
-    queue.releaseTableSharedLock(tableName);
-
-    // remove table queue
-    assertEquals(0, queue.size());
-    assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
-  }
-
-  /**
-   * Verify that "write" operations for a single table are serialized,
-   * but different tables can be executed in parallel.
-   */
-  @Test(timeout=90000)
-  public void testConcurrentWriteOps() throws Exception {
-    final TestTableProcSet procSet = new TestTableProcSet(queue);
-
-    final int NUM_ITEMS = 10;
-    final int NUM_TABLES = 4;
-    final AtomicInteger opsCount = new AtomicInteger(0);
-    for (int i = 0; i < NUM_TABLES; ++i) {
-      TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
-      for (int j = 1; j < NUM_ITEMS; ++j) {
-        procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-        opsCount.incrementAndGet();
-      }
-    }
-    assertEquals(opsCount.get(), queue.size());
-
-    final Thread[] threads = new Thread[NUM_TABLES * 2];
-    final HashSet<TableName> concurrentTables = new HashSet<TableName>();
-    final ArrayList<String> failures = new ArrayList<String>();
-    final AtomicInteger concurrentCount = new AtomicInteger(0);
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i] = new Thread() {
-        @Override
-        public void run() {
-          while (opsCount.get() > 0) {
-            try {
-              TableProcedureInterface proc = procSet.acquire();
-              if (proc == null) {
-                queue.signalAll();
-                if (opsCount.get() > 0) {
-                  continue;
-                }
-                break;
-              }
-              synchronized (concurrentTables) {
-                assertTrue("unexpected concurrency on " + proc.getTableName(),
-                  concurrentTables.add(proc.getTableName()));
-              }
-              assertTrue(opsCount.decrementAndGet() >= 0);
-              try {
-                long procId = ((Procedure)proc).getProcId();
-                TableName tableId = proc.getTableName();
-                int concurrent = concurrentCount.incrementAndGet();
-                assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
-                  concurrent >= 1 && concurrent <= NUM_TABLES);
-                LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
-                Thread.sleep(2000);
-                concurrent = concurrentCount.decrementAndGet();
-                LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
-                assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
-              } finally {
-                synchronized (concurrentTables) {
-                  assertTrue(concurrentTables.remove(proc.getTableName()));
-                }
-                procSet.release(proc);
-              }
-            } catch (Throwable e) {
-              LOG.error("Failed " + e.getMessage(), e);
-              synchronized (failures) {
-                failures.add(e.getMessage());
-              }
-            } finally {
-              queue.signalAll();
-            }
-          }
-        }
-      };
-      threads[i].start();
-    }
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i].join();
-    }
-    assertTrue(failures.toString(), failures.isEmpty());
-    assertEquals(0, opsCount.get());
-    assertEquals(0, queue.size());
-
-    for (int i = 1; i <= NUM_TABLES; ++i) {
-      TableName table = TableName.valueOf(String.format("testtb-%04d", i));
-      assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
-    }
-  }
-
-  public static class TestTableProcSet {
-    private final MasterProcedureQueue queue;
-    private Map<Long, TableProcedureInterface> procsMap =
-      new ConcurrentHashMap<Long, TableProcedureInterface>();
-
-    public TestTableProcSet(final MasterProcedureQueue queue) {
-      this.queue = queue;
-    }
-
-    public void addBack(TableProcedureInterface tableProc) {
-      Procedure proc = (Procedure)tableProc;
-      procsMap.put(proc.getProcId(), tableProc);
-      queue.addBack(proc);
-    }
-
-    public void addFront(TableProcedureInterface tableProc) {
-      Procedure proc = (Procedure)tableProc;
-      procsMap.put(proc.getProcId(), tableProc);
-      queue.addFront(proc);
-    }
-
-    public TableProcedureInterface acquire() {
-      TableProcedureInterface proc = null;
-      boolean avail = false;
-      while (!avail) {
-        Long procId = queue.poll();
-        proc = procId != null ? procsMap.remove(procId) : null;
-        if (proc == null) break;
-        switch (proc.getTableOperationType()) {
-          case CREATE:
-          case DELETE:
-          case EDIT:
-            avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(),
-              "op="+ proc.getTableOperationType());
-            break;
-          case READ:
-            avail = queue.tryAcquireTableSharedLock(proc.getTableName(),
-              "op="+ proc.getTableOperationType());
-            break;
-        }
-        if (!avail) {
-          addFront(proc);
-          LOG.debug("yield procId=" + procId);
-        }
-      }
-      return proc;
-    }
-
-    public void release(TableProcedureInterface proc) {
-      switch (proc.getTableOperationType()) {
-        case CREATE:
-        case DELETE:
-        case EDIT:
-          queue.releaseTableExclusiveLock(proc.getTableName());
-          break;
-        case READ:
-          queue.releaseTableSharedLock(proc.getTableName());
-          break;
-      }
-    }
-  }
-
-  public static class TestTableProcedure extends Procedure<Void>
-      implements TableProcedureInterface {
-    private final TableOperationType opType;
-    private final TableName tableName;
-
-    public TestTableProcedure() {
-      throw new UnsupportedOperationException("recovery should not be triggered here");
-    }
-
-    public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
-      this.tableName = tableName;
-      this.opType = opType;
-      setProcId(procId);
-    }
-
-    @Override
-    public TableName getTableName() {
-      return tableName;
-    }
-
-    @Override
-    public TableOperationType getTableOperationType() {
-      return opType;
-    }
-
-    @Override
-    protected Procedure[] execute(Void env) {
-      return null;
-    }
-
-    @Override
-    protected void rollback(Void env) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean abort(Void env) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void serializeStateData(final OutputStream stream) throws IOException {}
-
-    @Override
-    protected void deserializeStateData(final InputStream stream) throws IOException {}
-  }
-}


Mime
View raw message