hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [9/9] hbase git commit: HBASE-14837 Procedure v2 - Procedure Queue Improvement
Date Thu, 14 Jan 2016 17:26:05 GMT
HBASE-14837 Procedure v2 - Procedure Queue Improvement


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3c2229a9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3c2229a9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3c2229a9

Branch: refs/heads/branch-1
Commit: 3c2229a9a8aa988c93e3d389edb4942ee2bc36df
Parents: b3c5f09
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Thu Jan 14 08:29:10 2016 -0800
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Thu Jan 14 09:25:06 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ProcedureInfo.java  |    4 +-
 .../hbase/procedure2/ProcedureExecutor.java     |    3 +-
 .../procedure2/ProcedureFairRunQueues.java      |  174 ---
 .../hbase/procedure2/ProcedureRunnableSet.java  |    4 +-
 .../procedure2/ProcedureSimpleRunQueue.java     |    8 +-
 .../procedure2/TestProcedureFairRunQueues.java  |  154 ---
 .../org/apache/hadoop/hbase/master/HMaster.java |   39 +-
 .../procedure/AddColumnFamilyProcedure.java     |    9 +-
 .../procedure/CreateNamespaceProcedure.java     |    4 +-
 .../master/procedure/CreateTableProcedure.java  |    2 +-
 .../procedure/DeleteColumnFamilyProcedure.java  |    9 +-
 .../master/procedure/DeleteTableProcedure.java  |    2 +-
 .../master/procedure/DisableTableProcedure.java |    7 +-
 .../master/procedure/EnableTableProcedure.java  |    8 +-
 .../master/procedure/MasterProcedureEnv.java    |   34 +-
 .../master/procedure/MasterProcedureQueue.java  |  578 --------
 .../procedure/MasterProcedureScheduler.java     | 1241 ++++++++++++++++++
 .../procedure/ModifyColumnFamilyProcedure.java  |    9 +-
 .../master/procedure/ModifyTableProcedure.java  |    8 +-
 .../master/procedure/ServerCrashProcedure.java  |   11 +-
 .../procedure/ServerProcedureInterface.java     |   14 +-
 .../procedure/TruncateTableProcedure.java       |    2 +-
 .../apache/hadoop/hbase/master/TestMaster.java  |    8 +-
 .../hbase/master/TestMasterNoCluster.java       |    2 +-
 .../procedure/TestMasterProcedureEvents.java    |  181 +++
 .../procedure/TestMasterProcedureQueue.java     |  480 -------
 .../procedure/TestMasterProcedureScheduler.java |  487 +++++++
 27 files changed, 2024 insertions(+), 1458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
index 4a15857..0cd4103 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
@@ -222,10 +222,10 @@ public class ProcedureInfo {
       procProto.getOwner(),
       procProto.getState(),
       procProto.hasParentId() ? procProto.getParentId() : -1,
-          procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null,
+      procProto.hasException() ? procProto.getException() : null,
       procProto.getLastUpdate(),
       procProto.getStartTime(),
-      procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null);
+      procProto.hasResult() ? procProto.getResult().toByteArray() : null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 67ab119..95990e8 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -785,8 +785,7 @@ public class ProcedureExecutor<TEnvironment> {
    */
   private void execLoop() {
     while (isRunning()) {
-      Long procId = runnables.poll();
-      Procedure proc = procId != null ? procedures.get(procId) : null;
+      Procedure proc = runnables.poll();
       if (proc == null) continue;
 
       try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
deleted file mode 100644
index 242ae86..0000000
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2;
-
-import java.util.Map;
-
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * This class is a container of queues that allows to select a queue
- * in a round robin fashion, considering priority of the queue.
- *
- * the quantum is just how many poll() will return the same object.
- * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B
- * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B
- * then the object priority is just a priority * quantum
- *
- * Example:
- *  - three queues (A, B, C) with priorities (1, 1, 2)
- *  - The first poll() will return A
- *  - The second poll() will return B
- *  - The third and forth poll() will return C
- *  - and so on again and again.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> {
-  private ConcurrentSkipListMap<TKey, TQueue> objMap =
-    new ConcurrentSkipListMap<TKey, TQueue>();
-
-  private final ReentrantLock lock = new ReentrantLock();
-  private final int quantum;
-
-  private Map.Entry<TKey, TQueue> current = null;
-  private int currentQuantum = 0;
-
-  public interface FairObject {
-    boolean isAvailable();
-    int getPriority();
-  }
-
-  /**
-   * @param quantum how many poll() will return the same object.
-   */
-  public ProcedureFairRunQueues(final int quantum) {
-    this.quantum = quantum;
-  }
-
-  public TQueue get(final TKey key) {
-    return objMap.get(key);
-  }
-
-  public TQueue add(final TKey key, final TQueue queue) {
-    TQueue oldq = objMap.putIfAbsent(key, queue);
-    return oldq != null ? oldq : queue;
-  }
-
-  public TQueue remove(final TKey key) {
-    TQueue queue = objMap.get(key);
-    if (queue != null) {
-      lock.lock();
-      try {
-        queue = objMap.remove(key);
-        if (current != null && queue == current.getValue()) {
-          currentQuantum = 0;
-          current = null;
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-    return queue;
-  }
-
-  public void clear() {
-    lock.lock();
-    try {
-      currentQuantum = 0;
-      current = null;
-      objMap.clear();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * @return the next available item if present
-   */
-  public TQueue poll() {
-    lock.lock();
-    try {
-      TQueue queue;
-      if (currentQuantum == 0) {
-        if (nextObject() == null) {
-          // nothing here
-          return null;
-        }
-
-        queue = current.getValue();
-        currentQuantum = calculateQuantum(queue) - 1;
-      } else {
-        currentQuantum--;
-        queue = current.getValue();
-      }
-
-      if (!queue.isAvailable()) {
-        Map.Entry<TKey, TQueue> last = current;
-        // Try the next one
-        do {
-          if (nextObject() == null)
-            return null;
-        } while (current.getValue() != last.getValue() && !current.getValue().isAvailable());
-
-        queue = current.getValue();
-        currentQuantum = calculateQuantum(queue) - 1;
-      }
-
-      return queue;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append('{');
-    for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) {
-      builder.append(entry.getKey());
-      builder.append(':');
-      builder.append(entry.getValue());
-    }
-    builder.append('}');
-    return builder.toString();
-  }
-
-  private Map.Entry<TKey, TQueue> nextObject() {
-    Map.Entry<TKey, TQueue> next = null;
-
-    // If we have already a key, try the next one
-    if (current != null) {
-      next = objMap.higherEntry(current.getKey());
-    }
-
-    // if there is no higher key, go back to the first
-    current = (next != null) ? next : objMap.firstEntry();
-    return current;
-  }
-
-  private int calculateQuantum(final TQueue fairObject) {
-    // TODO
-    return Math.max(1, fairObject.getPriority() * quantum);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
index 2d7ba39..65df692 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
@@ -55,9 +55,9 @@ public interface ProcedureRunnableSet {
 
   /**
    * Fetch one Procedure from the queue
-   * @return the Procedure ID to execute, or null if nothing present.
+   * @return the Procedure to execute, or null if nothing present.
    */
-  Long poll();
+  Procedure poll();
 
   /**
    * In case the class is blocking on poll() waiting for items to be added,

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
index 7b17fb2..d23680d 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
-  private final Deque<Long> runnables = new ArrayDeque<Long>();
+  private final Deque<Procedure> runnables = new ArrayDeque<Procedure>();
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition waitCond = lock.newCondition();
 
@@ -40,7 +40,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
   public void addFront(final Procedure proc) {
     lock.lock();
     try {
-      runnables.addFirst(proc.getProcId());
+      runnables.addFirst(proc);
       waitCond.signal();
     } finally {
       lock.unlock();
@@ -51,7 +51,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
   public void addBack(final Procedure proc) {
     lock.lock();
     try {
-      runnables.addLast(proc.getProcId());
+      runnables.addLast(proc);
       waitCond.signal();
     } finally {
       lock.unlock();
@@ -65,7 +65,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
 
   @Override
   @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
-  public Long poll() {
+  public Procedure poll() {
     lock.lock();
     try {
       if (runnables.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
deleted file mode 100644
index 4a36665..0000000
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2;
-
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import static org.junit.Assert.assertEquals;
-
-@Category(SmallTests.class)
-public class TestProcedureFairRunQueues {
-  private static class TestRunQueue implements ProcedureFairRunQueues.FairObject {
-    private final int priority;
-    private final String name;
-
-    private boolean available = true;
-
-    public TestRunQueue(String name, int priority) {
-      this.name = name;
-      this.priority = priority;
-    }
-
-    @Override
-    public String toString() {
-      return name;
-    }
-
-    private void setAvailable(boolean available) {
-      this.available = available;
-    }
-
-    @Override
-    public boolean isAvailable() {
-      return available;
-    }
-
-    @Override
-    public int getPriority() {
-      return priority;
-    }
-  }
-
-  @Test
-  public void testEmptyFairQueues() throws Exception {
-    ProcedureFairRunQueues<String, TestRunQueue> fairq
-      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(null, fairq.poll());
-    }
-  }
-
-  @Test
-  public void testFairQueues() throws Exception {
-    ProcedureFairRunQueues<String, TestRunQueue> fairq
-      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
-    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
-    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
-    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
-
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(a, fairq.poll());
-      assertEquals(b, fairq.poll());
-      assertEquals(m, fairq.poll());
-      assertEquals(m, fairq.poll());
-    }
-  }
-
-  @Test
-  public void testFairQueuesNotAvailable() throws Exception {
-    ProcedureFairRunQueues<String, TestRunQueue> fairq
-      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
-    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
-    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
-    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
-
-    // m is not available
-    m.setAvailable(false);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(a, fairq.poll());
-      assertEquals(b, fairq.poll());
-    }
-
-    // m is available
-    m.setAvailable(true);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(m, fairq.poll());
-      assertEquals(m, fairq.poll());
-      assertEquals(a, fairq.poll());
-      assertEquals(b, fairq.poll());
-    }
-
-    // b is not available
-    b.setAvailable(false);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(m, fairq.poll());
-      assertEquals(m, fairq.poll());
-      assertEquals(a, fairq.poll());
-    }
-
-    assertEquals(m, fairq.poll());
-    m.setAvailable(false);
-    // m should be fetched next, but is no longer available
-    assertEquals(a, fairq.poll());
-    assertEquals(a, fairq.poll());
-    b.setAvailable(true);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(b, fairq.poll());
-      assertEquals(a, fairq.poll());
-    }
-  }
-
-  @Test
-  public void testFairQueuesDelete() throws Exception {
-    ProcedureFairRunQueues<String, TestRunQueue> fairq
-      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
-    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
-    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
-    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
-
-    // Fetch A and then remove it
-    assertEquals(a, fairq.poll());
-    assertEquals(a, fairq.remove("A"));
-
-    // Fetch B and then remove it
-    assertEquals(b, fairq.poll());
-    assertEquals(b, fairq.remove("B"));
-
-    // Fetch M and then remove it
-    assertEquals(m, fairq.poll());
-    assertEquals(m, fairq.remove("M"));
-
-    // nothing left
-    assertEquals(null, fairq.poll());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2999789..1fcc751 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -107,6 +107,7 @@ import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
 import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure;
 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
@@ -280,14 +281,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
 
   // flag set after we complete initialization once active,
   // it is not private since it's used in unit tests
-  volatile boolean initialized = false;
+  private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
 
   // flag set after master services are started,
   // initialization may have not completed yet.
   volatile boolean serviceStarted = false;
 
   // flag set after we complete assignMeta.
-  private volatile boolean serverCrashProcessingEnabled = false;
+  private final ProcedureEvent serverCrashProcessingEnabled =
+    new ProcedureEvent("server crash processing");
 
   LoadBalancer balancer;
   private RegionNormalizer normalizer;
@@ -783,7 +785,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     configurationManager.registerObserver(this.balancer);
 
     // Set master as 'initialized'.
-    initialized = true;
+    setInitialized(true);
 
     status.setStatus("Starting quota manager");
     initQuotaManager();
@@ -1002,8 +1004,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     // servers. This is required so that if meta is assigning to a server which dies after
     // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
     // stuck here waiting forever if waitForMeta is specified.
-    if (!serverCrashProcessingEnabled) {
-      serverCrashProcessingEnabled = true;
+    if (!isServerCrashProcessingEnabled()) {
+      setServerCrashProcessingEnabled(true);
       this.serverManager.processQueuedDeadServers();
     }
 
@@ -1240,7 +1242,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
 
   public boolean balance(boolean force) throws IOException {
     // if master not initialized, don't run balancer.
-    if (!this.initialized) {
+    if (!isInitialized()) {
       LOG.debug("Master has not been initialized, don't run balancer.");
       return false;
     }
@@ -1337,7 +1339,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
    * @throws CoordinatedStateException
    */
   public boolean normalizeRegions() throws IOException, CoordinatedStateException {
-    if (!this.initialized) {
+    if (!isInitialized()) {
       LOG.debug("Master has not been initialized, don't run region normalizer.");
       return false;
     }
@@ -1648,7 +1650,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     }
   }
 
-  private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) 
+  private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
       throws IOException {
     // FIFO compaction has some requirements
     // Actually FCP ignores periodic major compactions
@@ -1705,7 +1707,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
       }
     }
   }
-  
+
   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
       String message, Exception cause) throws IOException {
@@ -2301,7 +2303,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
 
   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
     checkServiceStarted();
-    if (!this.initialized) {
+    if (!isInitialized()) {
       throw new PleaseHoldException("Master is initializing");
     }
   }
@@ -2336,6 +2338,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
    */
   @Override
   public boolean isInitialized() {
+    return initialized.isReady();
+  }
+
+  @VisibleForTesting
+  public void setInitialized(boolean isInitialized) {
+    procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
+  }
+
+  public ProcedureEvent getInitializedEvent() {
     return initialized;
   }
 
@@ -2346,12 +2357,16 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
    */
   @Override
   public boolean isServerCrashProcessingEnabled() {
-    return this.serverCrashProcessingEnabled;
+    return serverCrashProcessingEnabled.isReady();
   }
 
   @VisibleForTesting
   public void setServerCrashProcessingEnabled(final boolean b) {
-    this.serverCrashProcessingEnabled = b;
+    procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
+  }
+
+  public ProcedureEvent getServerCrashProcessingEnabledEvent() {
+    return serverCrashProcessingEnabled;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
index b6642a0..3a98b0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -185,10 +184,8 @@ public class AddColumnFamilyProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_ADD_FAMILY.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "add family");
   }
 
   @Override
@@ -405,4 +402,4 @@ public class AddColumnFamilyProcedure
     }
     return regionInfoList;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
index 657bbfb..55fe5c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
@@ -205,7 +205,9 @@ public class CreateNamespaceProcedure
         return true;
       }
 
-      return false;
+      if (env.waitInitialized(this)) {
+        return false;
+      }
     }
     return getTableNamespaceManager(env).acquireExclusiveLock();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 8bcd3de..ad069bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -270,7 +270,7 @@ public class CreateTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized() && !getTableName().isSystemTable()) {
+    if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
       return false;
     }
     return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table");

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
index c15ab98..17cf5b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -201,10 +200,8 @@ public class DeleteColumnFamilyProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_DELETE_FAMILY.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "delete family");
   }
 
   @Override
@@ -442,4 +439,4 @@ public class DeleteColumnFamilyProcedure
     }
     return regionInfoList;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index 46345a5..71c6c2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@ -198,7 +198,7 @@ public class DeleteTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
+    if (env.waitInitialized(this)) return false;
     return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
index dbfa694..8e80a19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.TableStateManager;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.constraint.ConstraintException;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.exceptions.HBaseException;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.BulkAssigner;
@@ -215,10 +214,8 @@ public class DisableTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_DISABLE_TABLE.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "disable table");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
index 7201dc7..e54d6f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.TableStateManager;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.exceptions.HBaseException;
+import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.BulkAssigner;
 import org.apache.hadoop.hbase.master.GeneralBulkAssigner;
@@ -239,10 +239,8 @@ public class EnableTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_ENABLE_TABLE.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "enable table");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 6700b63..090b8cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.security.User;
@@ -85,12 +87,12 @@ public class MasterProcedureEnv {
     }
   }
 
-  private final MasterProcedureQueue procQueue;
+  private final MasterProcedureScheduler procSched;
   private final MasterServices master;
 
   public MasterProcedureEnv(final MasterServices master) {
     this.master = master;
-    this.procQueue = new MasterProcedureQueue(master.getConfiguration(),
+    this.procSched = new MasterProcedureScheduler(master.getConfiguration(),
       master.getTableLockManager());
   }
 
@@ -114,8 +116,8 @@ public class MasterProcedureEnv {
     return master.getMasterCoprocessorHost();
   }
 
-  public MasterProcedureQueue getProcedureQueue() {
-    return procQueue;
+  public MasterProcedureScheduler getProcedureQueue() {
+    return procSched;
   }
 
   public boolean isRunning() {
@@ -125,4 +127,28 @@ public class MasterProcedureEnv {
   public boolean isInitialized() {
     return master.isInitialized();
   }
+
+  public boolean waitInitialized(Procedure proc) {
+    return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc);
+  }
+
+  public boolean waitServerCrashProcessingEnabled(Procedure proc) {
+    return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
+  }
+
+  public void wake(ProcedureEvent event) {
+    procSched.wake(event);
+  }
+
+  public void suspend(ProcedureEvent event) {
+    procSched.suspend(event);
+  }
+
+  public void setEventReady(ProcedureEvent event, boolean isReady) {
+    if (isReady) {
+      procSched.wake(event);
+    } else {
+      procSched.suspend(event);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
deleted file mode 100644
index c4c7747..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues;
-import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
-import org.apache.hadoop.hbase.master.TableLockManager;
-import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
-import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
-
-/**
- * ProcedureRunnableSet for the Master Procedures.
- * This RunnableSet tries to provide to the ProcedureExecutor procedures
- * that can be executed without having to wait on a lock.
- * Most of the master operations can be executed concurrently, if they
- * are operating on different tables (e.g. two create table can be performed
- * at the same, time assuming table A and table B) or against two different servers; say
- * two servers that crashed at about the same time.
- *
- * <p>Each procedure should implement an interface providing information for this queue.
- * for example table related procedures should implement TableProcedureInterface.
- * each procedure will be pushed in its own queue, and based on the operation type
- * we may take smarter decision. e.g. we can abort all the operations preceding
- * a delete table, or similar.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class MasterProcedureQueue implements ProcedureRunnableSet {
-  private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class);
-
-  // Two queues to ensure that server procedures run ahead of table precedures always.
-  private final ProcedureFairRunQueues<TableName, RunQueue> tableFairQ;
-  /**
-   * Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the
-   * server that was carrying meta should rise to the top of the queue (this is how it used to
-   * work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers
-   * that were carrying system tables on crash; do I need to have these servers have priority?
-   *
-   * <p>Apart from the special-casing of meta and system tables, fairq is what we want
-   */
-  private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ;
-
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition waitCond = lock.newCondition();
-  private final TableLockManager lockManager;
-
-  private final int metaTablePriority;
-  private final int userTablePriority;
-  private final int sysTablePriority;
-  private static final int DEFAULT_SERVER_PRIORITY = 1;
-
-  /**
-   * Keeps count across server and table queues.
-   */
-  private int queueSize;
-
-  public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) {
-    this.tableFairQ = new ProcedureFairRunQueues<TableName, RunQueue>(1);
-    this.serverFairQ = new ProcedureFairRunQueues<ServerName, RunQueue>(1);
-    this.lockManager = lockManager;
-
-    // TODO: should this be part of the HTD?
-    metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
-    sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
-    userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
-  }
-
-  @Override
-  public void addFront(final Procedure proc) {
-    lock.lock();
-    try {
-      getRunQueueOrCreate(proc).addFront(proc);
-      queueSize++;
-      waitCond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void addBack(final Procedure proc) {
-    lock.lock();
-    try {
-      getRunQueueOrCreate(proc).addBack(proc);
-      queueSize++;
-      waitCond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void yield(final Procedure proc) {
-    addBack(proc);
-  }
-
-  @Override
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
-  public Long poll() {
-    Long pollResult = null;
-    lock.lock();
-    try {
-      if (queueSize == 0) {
-        waitCond.await();
-        if (queueSize == 0) {
-          return null;
-        }
-      }
-      // For now, let server handling have precedence over table handling; presumption is that it
-      // is more important handling crashed servers than it is running the
-      // enabling/disabling tables, etc.
-      pollResult = doPoll(serverFairQ.poll());
-      if (pollResult == null) {
-        pollResult = doPoll(tableFairQ.poll());
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    } finally {
-      lock.unlock();
-    }
-    return pollResult;
-  }
-
-  private Long doPoll(final RunQueue rq) {
-    if (rq == null || !rq.isAvailable()) return null;
-    this.queueSize--;
-    return rq.poll();
-  }
-
-  @Override
-  public void signalAll() {
-    lock.lock();
-    try {
-      waitCond.signalAll();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void clear() {
-    lock.lock();
-    try {
-      serverFairQ.clear();
-      tableFairQ.clear();
-      queueSize = 0;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public int size() {
-    lock.lock();
-    try {
-      return queueSize;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public String toString() {
-    lock.lock();
-    try {
-      return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ +
-        ", serverFairQ: " + serverFairQ;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void completionCleanup(Procedure proc) {
-    if (proc instanceof TableProcedureInterface) {
-      TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
-      boolean tableDeleted;
-      if (proc.hasException()) {
-        IOException procEx =  proc.getException().unwrapRemoteException();
-        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
-          // create failed because the table already exist
-          tableDeleted = !(procEx instanceof TableExistsException);
-        } else {
-          // the operation failed because the table does not exist
-          tableDeleted = (procEx instanceof TableNotFoundException);
-        }
-      } else {
-        // the table was deleted
-        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
-      }
-      if (tableDeleted) {
-        markTableAsDeleted(iProcTable.getTableName());
-      }
-    }
-    // No cleanup for ServerProcedureInterface types, yet.
-  }
-
-  private RunQueue getRunQueueOrCreate(final Procedure proc) {
-    if (proc instanceof TableProcedureInterface) {
-      final TableName table = ((TableProcedureInterface)proc).getTableName();
-      return getRunQueueOrCreate(table);
-    }
-    if (proc instanceof ServerProcedureInterface) {
-      return getRunQueueOrCreate((ServerProcedureInterface)proc);
-    }
-    // TODO: at the moment we only have Table and Server procedures
-    // if you are implementing a non-table/non-server procedure, you have two options: create
-    // a group for all the non-table/non-server procedures or try to find a key for your
-    // non-table/non-server procedures and implement something similar to the TableRunQueue.
-    throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet");
-  }
-
-  private TableRunQueue getRunQueueOrCreate(final TableName table) {
-    final TableRunQueue queue = getRunQueue(table);
-    if (queue != null) return queue;
-    return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table));
-  }
-
-  private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) {
-    final ServerRunQueue queue = getRunQueue(spi.getServerName());
-    if (queue != null) return queue;
-    return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi));
-  }
-
-  private TableRunQueue createTableRunQueue(final TableName table) {
-    int priority = userTablePriority;
-    if (table.equals(TableName.META_TABLE_NAME)) {
-      priority = metaTablePriority;
-    } else if (table.isSystemTable()) {
-      priority = sysTablePriority;
-    }
-    return new TableRunQueue(priority);
-  }
-
-  private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) {
-    return new ServerRunQueue(DEFAULT_SERVER_PRIORITY);
-  }
-
-  private TableRunQueue getRunQueue(final TableName table) {
-    return (TableRunQueue)tableFairQ.get(table);
-  }
-
-  private ServerRunQueue getRunQueue(final ServerName sn) {
-    return (ServerRunQueue)serverFairQ.get(sn);
-  }
-
-  /**
-   * Try to acquire the write lock on the specified table.
-   * other operations in the table-queue will be executed after the lock is released.
-   * @param table Table to lock
-   * @param purpose Human readable reason for locking the table
-   * @return true if we were able to acquire the lock on the table, otherwise false.
-   */
-  public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
-    return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose);
-  }
-
-  /**
-   * Release the write lock taken with tryAcquireTableWrite()
-   * @param table the name of the table that has the write lock
-   */
-  public void releaseTableExclusiveLock(final TableName table) {
-    getRunQueue(table).releaseExclusiveLock(lockManager, table);
-  }
-
-  /**
-   * Try to acquire the read lock on the specified table.
-   * other read operations in the table-queue may be executed concurrently,
-   * otherwise they have to wait until all the read-locks are released.
-   * @param table Table to lock
-   * @param purpose Human readable reason for locking the table
-   * @return true if we were able to acquire the lock on the table, otherwise false.
-   */
-  public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
-    return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose);
-  }
-
-  /**
-   * Release the read lock taken with tryAcquireTableRead()
-   * @param table the name of the table that has the read lock
-   */
-  public void releaseTableSharedLock(final TableName table) {
-    getRunQueue(table).releaseSharedLock(lockManager, table);
-  }
-
-  /**
-   * Try to acquire the write lock on the specified server.
-   * @see #releaseServerExclusiveLock(ServerProcedureInterface)
-   * @param spi Server to lock
-   * @return true if we were able to acquire the lock on the server, otherwise false.
-   */
-  public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) {
-    return getRunQueueOrCreate(spi).tryExclusiveLock();
-  }
-
-  /**
-   * Release the write lock
-   * @see #tryAcquireServerExclusiveLock(ServerProcedureInterface)
-   * @param spi the server that has the write lock
-   */
-  public void releaseServerExclusiveLock(final ServerProcedureInterface spi) {
-    getRunQueue(spi.getServerName()).releaseExclusiveLock();
-  }
-
-  /**
-   * Try to acquire the read lock on the specified server.
-   * @see #releaseServerSharedLock(ServerProcedureInterface)
-   * @param spi Server to lock
-   * @return true if we were able to acquire the lock on the server, otherwise false.
-   */
-  public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) {
-    return getRunQueueOrCreate(spi).trySharedLock();
-  }
-
-  /**
-   * Release the read lock taken
-   * @see #tryAcquireServerSharedLock(ServerProcedureInterface)
-   * @param spi the server that has the read lock
-   */
-  public void releaseServerSharedLock(final ServerProcedureInterface spi) {
-    getRunQueue(spi.getServerName()).releaseSharedLock();
-  }
-
-  /**
-   * Tries to remove the queue and the table-lock of the specified table.
-   * If there are new operations pending (e.g. a new create),
-   * the remove will not be performed.
-   * @param table the name of the table that should be marked as deleted
-   * @return true if deletion succeeded, false otherwise meaning that there are
-   *    other new operations pending for that table (e.g. a new create).
-   */
-  protected boolean markTableAsDeleted(final TableName table) {
-    TableRunQueue queue = getRunQueue(table);
-    if (queue != null) {
-      lock.lock();
-      try {
-        if (queue.isEmpty() && queue.acquireDeleteLock()) {
-          tableFairQ.remove(table);
-
-          // Remove the table lock
-          try {
-            lockManager.tableDeleted(table);
-          } catch (IOException e) {
-            LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical
-          }
-        } else {
-          // TODO: If there are no create, we can drop all the other ops
-          return false;
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-    return true;
-  }
-
-  private interface RunQueue extends ProcedureFairRunQueues.FairObject {
-    void addFront(Procedure proc);
-    void addBack(Procedure proc);
-    Long poll();
-    boolean acquireDeleteLock();
-  }
-
-  /**
-   * Base abstract class for RunQueue implementations.
-   * Be careful honoring synchronizations in subclasses. In here we protect access but if you are
-   * acting on a state found in here, be sure dependent code keeps synchronization.
-   * Implements basic in-memory read/write locking mechanism to prevent procedure steps being run
-   * in parallel.
-   */
-  private static abstract class AbstractRunQueue implements RunQueue {
-    // All modification of runnables happens with #lock held.
-    private final Deque<Long> runnables = new ArrayDeque<Long>();
-    private final int priority;
-    private boolean exclusiveLock = false;
-    private int sharedLock = 0;
-
-    public AbstractRunQueue(int priority) {
-      this.priority = priority;
-    }
-
-    boolean isEmpty() {
-      return this.runnables.isEmpty();
-    }
-
-    @Override
-    public boolean isAvailable() {
-      synchronized (this) {
-        return !exclusiveLock && !runnables.isEmpty();
-      }
-    }
-
-    @Override
-    public int getPriority() {
-      return this.priority;
-    }
-
-    @Override
-    public void addFront(Procedure proc) {
-      this.runnables.addFirst(proc.getProcId());
-    }
-
-    @Override
-    public void addBack(Procedure proc) {
-      this.runnables.addLast(proc.getProcId());
-    }
-
-    @Override
-    public Long poll() {
-      return this.runnables.poll();
-    }
-
-    @Override
-    public synchronized boolean acquireDeleteLock() {
-      return tryExclusiveLock();
-    }
-
-    public synchronized boolean isLocked() {
-      return isExclusiveLock() || sharedLock > 0;
-    }
-
-    public synchronized boolean isExclusiveLock() {
-      return this.exclusiveLock;
-    }
-
-    public synchronized boolean trySharedLock() {
-      if (isExclusiveLock()) return false;
-      sharedLock++;
-      return true;
-    }
-
-    public synchronized void releaseSharedLock() {
-      sharedLock--;
-    }
-
-    /**
-     * @return True if only one instance of a shared lock outstanding.
-     */
-    synchronized boolean isSingleSharedLock() {
-      return sharedLock == 1;
-    }
-
-    public synchronized boolean tryExclusiveLock() {
-      if (isLocked()) return false;
-      exclusiveLock = true;
-      return true;
-    }
-
-    public synchronized void releaseExclusiveLock() {
-      exclusiveLock = false;
-    }
-
-    @Override
-    public String toString() {
-      return this.runnables.toString();
-    }
-  }
-
-  /**
-   * Run Queue for Server procedures.
-   */
-  private static class ServerRunQueue extends AbstractRunQueue {
-    public ServerRunQueue(int priority) {
-      super(priority);
-    }
-  }
-
-  /**
-   * Run Queue for a Table. It contains a read-write lock that is used by the
-   * MasterProcedureQueue to decide if we should fetch an item from this queue
-   * or skip to another one which will be able to run without waiting for locks.
-   */
-  private static class TableRunQueue extends AbstractRunQueue {
-    private TableLock tableLock = null;
-
-    public TableRunQueue(int priority) {
-      super(priority);
-    }
-
-    // TODO: Improve run-queue push with TableProcedureInterface.getType()
-    //       we can take smart decisions based on the type of the operation (e.g. create/delete)
-    @Override
-    public void addBack(final Procedure proc) {
-      super.addBack(proc);
-    }
-
-    public synchronized boolean trySharedLock(final TableLockManager lockManager,
-        final TableName tableName, final String purpose) {
-      if (isExclusiveLock()) return false;
-
-      // Take zk-read-lock
-      tableLock = lockManager.readLock(tableName, purpose);
-      try {
-        tableLock.acquire();
-      } catch (IOException e) {
-        LOG.error("failed acquire read lock on " + tableName, e);
-        tableLock = null;
-        return false;
-      }
-      trySharedLock();
-      return true;
-    }
-
-    public synchronized void releaseSharedLock(final TableLockManager lockManager,
-        final TableName tableName) {
-      releaseTableLock(lockManager, isSingleSharedLock());
-      releaseSharedLock();
-    }
-
-    public synchronized boolean tryExclusiveLock(final TableLockManager lockManager,
-        final TableName tableName, final String purpose) {
-      if (isLocked()) return false;
-      // Take zk-write-lock
-      tableLock = lockManager.writeLock(tableName, purpose);
-      try {
-        tableLock.acquire();
-      } catch (IOException e) {
-        LOG.error("failed acquire write lock on " + tableName, e);
-        tableLock = null;
-        return false;
-      }
-      tryExclusiveLock();
-      return true;
-    }
-
-    public synchronized void releaseExclusiveLock(final TableLockManager lockManager,
-        final TableName tableName) {
-      releaseTableLock(lockManager, true);
-      releaseExclusiveLock();
-    }
-
-    private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
-      for (int i = 0; i < 3; ++i) {
-        try {
-          tableLock.release();
-          if (reset) {
-            tableLock = null;
-          }
-          break;
-        } catch (IOException e) {
-          LOG.warn("Could not release the table write-lock", e);
-        }
-      }
-    }
-  }
-}


Mime
View raw message