hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [2/2] hbase git commit: HBASE-16533 Procedure v2 - Extract chore from the executor
Date Wed, 31 Aug 2016 02:13:59 GMT
HBASE-16533 Procedure v2 - Extract chore from the executor


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aeecd4df
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aeecd4df
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aeecd4df

Branch: refs/heads/branch-1
Commit: aeecd4df838f0dae8287c79036e45f6d6b634eb3
Parents: 9907a7e
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Tue Aug 30 18:40:51 2016 -0700
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Tue Aug 30 19:02:31 2016 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     |  62 +++++------
 .../procedure2/ProcedureInMemoryChore.java      |  69 ++++++++++++
 .../procedure2/util/TimeoutBlockingQueue.java   |  15 +++
 .../procedure2/TestProcedureInMemoryChore.java  | 110 +++++++++++++++++++
 .../util/TestTimeoutBlockingQueue.java          |  22 ++++
 5 files changed, 242 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index c195f65..ee70dd1 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -127,7 +125,8 @@ public class ProcedureExecutor<TEnvironment> {
    * the master (e.g. master failover) so, if we delay a bit the real deletion of
    * the proc result the client will be able to get the result the next try.
    */
-  private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment>
{
+  private static class CompletedProcedureCleaner<TEnvironment>
+      extends ProcedureInMemoryChore<TEnvironment> {
     private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
 
     private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
@@ -148,14 +147,15 @@ public class ProcedureExecutor<TEnvironment> {
         final Map<Long, ProcedureInfo> completedMap,
         final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
       // set the timeout interval that triggers the periodic-procedure
-      setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
+      super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
       this.completed = completedMap;
       this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
       this.store = store;
       this.conf = conf;
     }
 
-    public void periodicExecute(final TEnvironment env) {
+    @Override
+    protected void periodicExecute(final TEnvironment env) {
       if (completed.isEmpty()) {
         if (LOG.isTraceEnabled()) {
           LOG.trace("No completed procedures to cleanup.");
@@ -189,31 +189,6 @@ public class ProcedureExecutor<TEnvironment> {
         }
       }
     }
-
-    @Override
-    protected Procedure[] execute(final TEnvironment env) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void rollback(final TEnvironment env) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean abort(final TEnvironment env) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void serializeStateData(final OutputStream stream) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void deserializeStateData(final InputStream stream) {
-      throw new UnsupportedOperationException();
-    }
   }
 
   /**
@@ -526,9 +501,8 @@ public class ProcedureExecutor<TEnvironment> {
       threads[i].start();
     }
 
-    // Add completed cleaner
-    waitingTimeout.add(
-      new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
+    // Add completed cleaner chore
+    addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
   }
 
   public void stop() {
@@ -617,6 +591,22 @@ public class ProcedureExecutor<TEnvironment> {
   }
 
   /**
+   * Add a chore procedure to the executor
+   * @param chore the chore to add
+   */
+  public void addChore(final ProcedureInMemoryChore chore) {
+    waitingTimeout.add(chore);
+  }
+
+  /**
+   * Remove a chore procedure from the executor
+   * @param chore the chore to remove
+   */
+  public void removeChore(final ProcedureInMemoryChore chore) {
+    waitingTimeout.remove(chore);
+  }
+
+  /**
    * Add a new root-procedure to the executor.
    * @param proc the new procedure to execute.
    * @return the procedure id, that can be used to monitor the operation
@@ -906,12 +896,12 @@ public class ProcedureExecutor<TEnvironment> {
       // will have the tracker saying everything is in the last log.
       // ----------------------------------------------------------------------------
 
-      // The CompletedProcedureCleaner is a special case, and it acts as a chore.
+      // The ProcedureInMemoryChore is a special case, and it acts as a chore.
       // instead of bringing the Chore class in, we reuse this timeout thread for
       // this special case.
-      if (proc instanceof CompletedProcedureCleaner) {
+      if (proc instanceof ProcedureInMemoryChore) {
         try {
-          ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
+          ((ProcedureInMemoryChore)proc).periodicExecute(getEnvironment());
         } catch (Throwable e) {
           LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
new file mode 100644
index 0000000..bdced10
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Special procedure used as a chore.
+ * instead of bringing the Chore class in (dependencies reason),
+ * we reuse the executor timeout thread for this special case.
+ *
+ * The assumption is that procedure is used as hook to dispatch other procedures
+ * or trigger some cleanups. It does not store state in the ProcedureStore.
+ * this is just for in-memory chore executions.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEnvironment>
{
+  protected ProcedureInMemoryChore(final int timeoutMsec) {
+    setTimeout(timeoutMsec);
+  }
+
+  protected abstract void periodicExecute(final TEnvironment env);
+
+  @Override
+  protected Procedure[] execute(final TEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void rollback(final TEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean abort(final TEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
index f710ef4..fceabb1 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
@@ -92,6 +92,20 @@ public class TimeoutBlockingQueue<E> {
     }
   }
 
+  public void remove(E e) {
+    lock.lock();
+    try {
+      for (int i = 0; i < objects.length; ++i) {
+        if (objects[i] == e) {
+          objects[i] = null;
+          return;
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
   @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
   public E poll() {
     lock.lock();
@@ -210,6 +224,7 @@ public class TimeoutBlockingQueue<E> {
   }
 
   private long getNanosTimeout(final E obj) {
+    if (obj == null) return 0;
     TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
     long timeout = timeoutRetriever.getTimeout(obj);
     return unit.toNanos(timeout);

http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
new file mode 100644
index 0000000..32e3e8c
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureInMemoryChore {
+  private static final Log LOG = LogFactory.getLog(TestProcedureInMemoryChore.class);
+
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+
+  private TestProcEnv procEnv;
+  private NoopProcedureStore procStore;
+  private ProcedureExecutor<TestProcEnv> procExecutor;
+
+  private HBaseCommonTestingUtility htu;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+
+    procEnv = new TestProcEnv();
+    procStore = new NoopProcedureStore();
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+    procExecutor.testing = new ProcedureExecutor.Testing();
+    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procExecutor.stop();
+    procStore.stop(false);
+  }
+
+  @Test
+  public void testChoreAddAndRemove() throws Exception {
+    final int timeoutMSec = 50;
+    final int nCountDown = 5;
+
+    // submit the chore and wait for execution
+    CountDownLatch latch = new CountDownLatch(nCountDown);
+    TestLatchChore chore = new TestLatchChore(timeoutMSec, latch);
+    procExecutor.addChore(chore);
+    latch.await();
+
+    // remove the chore and verify it is no longer executed
+    procExecutor.removeChore(chore);
+    latch = new CountDownLatch(nCountDown);
+    chore.setLatch(latch);
+    latch.await(timeoutMSec * nCountDown, TimeUnit.MILLISECONDS);
+    LOG.info("chore latch count=" + latch.getCount());
+    assertTrue(latch.getCount() > 0);
+  }
+
+  public static class TestLatchChore extends ProcedureInMemoryChore<TestProcEnv> {
+    private CountDownLatch latch;
+
+    public TestLatchChore(final int timeoutMSec, final CountDownLatch latch) {
+      super(timeoutMSec);
+      setLatch(latch);
+    }
+
+    public void setLatch(final CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    protected void periodicExecute(final TestProcEnv env) {
+      latch.countDown();
+    }
+  }
+
+  private static class TestProcEnv {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
index 688e23a..5750650 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
@@ -133,4 +133,26 @@ public class TestTimeoutBlockingQueue {
       }
     }
   }
+
+  @Test
+  public void testRemove() {
+    TimeoutBlockingQueue<TestObject> queue =
+      new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
+
+    TestObject[] objs = new TestObject[5];
+    for (int i = 0; i < objs.length; ++i) {
+      objs[i] = new TestObject(0, i * 10);
+      queue.add(objs[i]);
+    }
+    queue.dump();
+
+    for (int i = 0; i < objs.length; i += 2) {
+      queue.remove(objs[i]);
+    }
+
+    for (int i = 0; i < objs.length; ++i) {
+      TestObject x = queue.poll();
+      assertEquals((i % 2) == 0 ? null : objs[i], x);
+    }
+  }
 }


Mime
View raw message