hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From allan...@apache.org
Subject hbase git commit: HBASE-21354 Procedure may be deleted improperly during master restarts resulting in 'Corrupt'
Date Tue, 23 Oct 2018 02:56:02 GMT
Repository: hbase
Updated Branches:
  refs/heads/master ae13b0b29 -> 86f23128b


HBASE-21354 Procedure may be deleted improperly during master restarts resulting in 'Corrupt'


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/86f23128
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/86f23128
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/86f23128

Branch: refs/heads/master
Commit: 86f23128b0d66deb70790785e63d2f7e01d5ab8d
Parents: ae13b0b
Author: Allan Yang <allan163@apache.org>
Authored: Tue Oct 23 10:55:18 2018 +0800
Committer: Allan Yang <allan163@apache.org>
Committed: Tue Oct 23 10:55:18 2018 +0800

----------------------------------------------------------------------
 .../procedure2/store/ProcedureStoreTracker.java |  25 +-
 .../procedure2/store/wal/WALProcedureStore.java |  52 ++--
 .../procedure2/ProcedureTestingUtility.java     |  32 ++-
 .../hbase/procedure2/TestProcedureCleanup.java  | 242 +++++++++++++++++++
 4 files changed, 324 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/86f23128/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index a5b5825..64479b2 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -203,7 +203,7 @@ public class ProcedureStoreTracker {
    * then we mark it as deleted.
    * @see #setDeletedIfModified(long...)
    */
-  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
+  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker, boolean globalTracker)
{
     BitSetNode trackerNode = null;
     for (BitSetNode node : map.values()) {
       final long minProcId = node.getStart();
@@ -214,9 +214,26 @@ public class ProcedureStoreTracker {
         }
 
         trackerNode = tracker.lookupClosestNode(trackerNode, procId);
-        if (trackerNode == null || !trackerNode.contains(procId) ||
-          trackerNode.isModified(procId)) {
-          // the procedure was removed or modified
+        if (trackerNode == null || !trackerNode.contains(procId)) {
+          // the procId is not exist in the track, we can only delete the proc
+          // if globalTracker set to true.
+          // Only if the procedure is not in the global tracker we can delete the
+          // the procedure. In other cases, the procedure may not update in a single
+          // log, we cannot delete it just because the log's track doesn't have
+          // any info for the procedure.
+          if (globalTracker) {
+            node.delete(procId);
+          }
+          continue;
+        }
+        // Only check delete in the global tracker, only global tracker has the
+        // whole picture
+        if (globalTracker && trackerNode.isDeleted(procId) == DeleteState.YES) {
+          node.delete(procId);
+          continue;
+        }
+        if (trackerNode.isModified(procId)) {
+          // the procedure was modified
           node.delete(procId);
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/86f23128/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 5a5face..0a89c3f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -97,7 +97,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
  * will first be initialized to the oldest file's tracker(which is stored in the trailer),
using the
  * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then
merge it
  * with the tracker of every newer wal files, using the
- * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we
find out
+ * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker, boolean)}.
+ * If we find out
  * that all the modified procedures for the oldest wal file are modified or deleted in newer
wal
  * files, then we can delete it. This is because that, every time we call
  * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)},
we will
@@ -343,7 +344,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     // Close the writer
-    closeCurrentLogStream();
+    closeCurrentLogStream(abort);
 
     // Close the old logs
     // they should be already closed, this is just in case the load fails
@@ -398,7 +399,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   public void recoverLease() throws IOException {
     lock.lock();
     try {
-      LOG.trace("Starting WAL Procedure Store lease recovery");
+      LOG.debug("Starting WAL Procedure Store lease recovery");
       boolean afterFirstAttempt = false;
       while (isRunning()) {
         // Don't sleep before first attempt
@@ -433,7 +434,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
           continue;
         }
 
-        LOG.trace("Lease acquired for flushLogId={}", flushLogId);
+        LOG.debug("Lease acquired for flushLogId={}", flushLogId);
         break;
       }
     } finally {
@@ -451,7 +452,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
       // Nothing to do, If we have only the current log.
       if (logs.size() == 1) {
-        LOG.trace("No state logs to replay.");
+        LOG.debug("No state logs to replay.");
         loader.setMaxProcId(0);
         return;
       }
@@ -983,7 +984,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   @VisibleForTesting
-  boolean rollWriterForTesting() throws IOException {
+  public boolean rollWriterForTesting() throws IOException {
     lock.lock();
     try {
       return rollWriter();
@@ -1006,11 +1007,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
     if (storeTracker.isEmpty()) {
       LOG.trace("no active procedures");
       tryRollWriter();
-      removeAllLogs(flushLogId - 1);
+      removeAllLogs(flushLogId - 1, "no active procedures");
     } else {
       if (storeTracker.isAllModified()) {
         LOG.trace("all the active procedures are in the latest log");
-        removeAllLogs(flushLogId - 1);
+        removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log");
       }
 
       // if the log size has exceeded the roll threshold
@@ -1091,7 +1092,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       return false;
     }
 
-    closeCurrentLogStream();
+    closeCurrentLogStream(false);
 
     storeTracker.resetModified();
     stream = newStream;
@@ -1124,7 +1125,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return true;
   }
 
-  private void closeCurrentLogStream() {
+  private void closeCurrentLogStream(boolean abort) {
     if (stream == null || logs.isEmpty()) {
       return;
     }
@@ -1133,8 +1134,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
       ProcedureWALFile log = logs.getLast();
       log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
       log.updateLocalTracker(storeTracker);
-      long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
-      log.addToSize(trailerSize);
+      if (!abort) {
+        long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
+        log.addToSize(trailerSize);
+      }
     } catch (IOException e) {
       LOG.warn("Unable to write the trailer", e);
     }
@@ -1153,6 +1156,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
     // once there is nothing olding the oldest WAL we can remove it.
     while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
+      LOG.info("Remove the oldest log {}", logs.getFirst());
       removeLogFile(logs.getFirst(), walArchiveDir);
       buildHoldingCleanupTracker();
     }
@@ -1170,24 +1174,38 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
     // compute the holding tracker.
     //  - the first WAL is used for the 'updates'
-    //  - the other WALs are scanned to remove procs already in other wals.
+    //  - the global tracker is passed in first to decide which procedures are not
+    //    exist anymore, so we can mark them as deleted in holdingCleanupTracker.
+    //    Only global tracker have the whole picture here.
+    //  - the other WALs are scanned to remove procs already updated in a newer wal.
+    //    If it is updated in a newer wal, we can mark it as delelted in holdingCleanupTracker
+    //    But, we can not delete it if it was shown deleted in the newer wal, as said
+    //    above.
     // TODO: exit early if holdingCleanupTracker.isEmpty()
     holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
-    holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker);
+    //Passing in the global tracker, we can delete the procedures not in the global
+    //tracker, because they are deleted in the later logs
+    holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker, true);
     for (int i = 1, size = logs.size() - 1; i < size; ++i) {
-      holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker());
+      // Set deleteIfNotExists to false since a single log's tracker is passed in.
+      // Since a specific procedure may not show up in the log at all(not executed or
+      // updated during the time), we can not delete the procedure just because this log
+      // don't have the info of the procedure. We can delete the procedure only if
+      // in this log's tracker, it was cleanly showed that the procedure is modified or deleted
+      // in the corresponding BitSetNode.
+      holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker(), false);
     }
   }
 
   /**
    * Remove all logs with logId <= {@code lastLogId}.
    */
-  private void removeAllLogs(long lastLogId) {
+  private void removeAllLogs(long lastLogId, String why) {
     if (logs.size() <= 1) {
       return;
     }
 
-    LOG.trace("Remove all state logs with ID less than {}", lastLogId);
+    LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why);
 
     boolean removed = false;
     while (logs.size() > 1) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/86f23128/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 4d06e2f..843b724 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -67,19 +67,37 @@ public class ProcedureTestingUtility {
     });
   }
 
+  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
+      boolean abort, boolean startWorkers) throws Exception {
+    restart(procExecutor, false, true, null, null, null,  abort, startWorkers);
+  }
+
+  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
+      boolean abort) throws Exception {
+    restart(procExecutor, false, true, null, null, null, abort, true);
+  }
+
   public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor)
throws Exception {
-    restart(procExecutor, false, true, null, null, null);
+    restart(procExecutor, false, true, null, null, null, false, true);
   }
 
   public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
       boolean abortOnCorruption) throws IOException {
+    initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
+  }
+
+  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
+      boolean abortOnCorruption, boolean startWorkers) throws IOException {
     procExecutor.init(numThreads, abortOnCorruption);
-    procExecutor.startWorkers();
+    if (startWorkers) {
+      procExecutor.startWorkers();
+    }
   }
 
   public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
       boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
-      Callable<Void> actionBeforeStartWorker, Callable<Void> startAction)
+      Callable<Void> actionBeforeStartWorker, Callable<Void> startAction,
+      boolean abort, boolean startWorkers)
       throws Exception {
     final ProcedureStore procStore = procExecutor.getStore();
     final int storeThreads = procExecutor.getCorePoolSize();
@@ -93,7 +111,7 @@ public class ProcedureTestingUtility {
     // stop
     LOG.info("RESTART - Stop");
     procExecutor.stop();
-    procStore.stop(false);
+    procStore.stop(abort);
     if (stopAction != null) {
       stopAction.call();
     }
@@ -109,7 +127,9 @@ public class ProcedureTestingUtility {
     if (actionBeforeStartWorker != null) {
       actionBeforeStartWorker.call();
     }
-    procExecutor.startWorkers();
+    if (startWorkers) {
+      procExecutor.startWorkers();
+    }
     if (startAction != null) {
       startAction.call();
     }
@@ -207,7 +227,7 @@ public class ProcedureTestingUtility {
     NoopProcedureStore procStore = new NoopProcedureStore();
     ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env,
procStore);
     procStore.start(1);
-    initAndStartWorkers(procExecutor, 1, false);
+    initAndStartWorkers(procExecutor, 1, false, true);
     try {
       return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/86f23128/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
new file mode 100644
index 0000000..e06fdc5
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureCleanup {
+  @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
+      .forClass(TestProcedureCleanup.class);
+
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class);
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+
+  private static TestProcEnv procEnv;
+  private static WALProcedureStore procStore;
+
+  private static ProcedureExecutor<TestProcEnv> procExecutor;
+
+  private static HBaseCommonTestingUtility htu;
+
+  private static FileSystem fs;
+  private static Path testDir;
+  private static Path logDir;
+
+  private static class TestProcEnv {
+
+  }
+
+  private void createProcExecutor(String dir) throws Exception {
+    logDir = new Path(testDir, dir);
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
+    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv,
+        procStore);
+    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+    ProcedureTestingUtility
+        .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true);
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    htu = new HBaseCommonTestingUtility();
+
+    // NOTE: The executor will be created by each test
+    procEnv = new TestProcEnv();
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+
+  }
+
+  @Test
+  public void testProcedureShouldNotCleanOnLoad() throws Exception {
+    createProcExecutor("testProcedureShouldNotCleanOnLoad");
+    final RootProcedure proc = new RootProcedure();
+    long rootProc = procExecutor.submitProcedure(proc);
+    LOG.info("Begin to execute " + rootProc);
+    // wait until the child procedure arrival
+    while(procExecutor.getProcedures().size() < 2) {
+      Thread.sleep(100);
+    }
+    SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor
+        .getProcedures().get(1);
+    // wait until the suspendProcedure executed
+    suspendProcedure.latch.countDown();
+    Thread.sleep(100);
+    // roll the procedure log
+    LOG.info("Begin to roll log ");
+    procStore.rollWriterForTesting();
+    LOG.info("finish to roll log ");
+    Thread.sleep(500);
+    LOG.info("begin to restart1 ");
+    ProcedureTestingUtility.restart(procExecutor, true);
+    LOG.info("finish to restart1 ");
+    Assert.assertTrue(procExecutor.getProcedure(rootProc) != null);
+    Thread.sleep(500);
+    LOG.info("begin to restart2 ");
+    ProcedureTestingUtility.restart(procExecutor, true);
+    LOG.info("finish to restart2 ");
+    Assert.assertTrue(procExecutor.getProcedure(rootProc) != null);
+  }
+
+  @Test
+  public void testProcedureUpdatedShouldClean() throws Exception {
+    createProcExecutor("testProcedureUpdatedShouldClean");
+    SuspendProcedure suspendProcedure = new SuspendProcedure();
+    long suspendProc = procExecutor.submitProcedure(suspendProcedure);
+    LOG.info("Begin to execute " + suspendProc);
+    suspendProcedure.latch.countDown();
+    Thread.sleep(500);
+    LOG.info("begin to restart1 ");
+    ProcedureTestingUtility.restart(procExecutor, true);
+    LOG.info("finish to restart1 ");
+    while(procExecutor.getProcedure(suspendProc) == null) {
+      Thread.sleep(100);
+    }
+    // Wait until the suspendProc executed after restart
+    suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc);
+    suspendProcedure.latch.countDown();
+    Thread.sleep(500);
+    // Should be 1 log since the suspendProcedure is updated in the new log
+    Assert.assertTrue(procStore.getActiveLogs().size() == 1);
+    // restart procExecutor
+    LOG.info("begin to restart2");
+    // Restart the executor but do not start the workers.
+    // Otherwise, the suspendProcedure will soon be executed and the oldest log
+    // will be cleaned, leaving only the newest log.
+    ProcedureTestingUtility.restart(procExecutor, true, false);
+    LOG.info("finish to restart2");
+    // There should be two active logs
+    Assert.assertTrue(procStore.getActiveLogs().size() == 2);
+    procExecutor.startWorkers();
+
+  }
+
+  @Test
+  public void testProcedureDeletedShouldClean() throws Exception {
+    createProcExecutor("testProcedureDeletedShouldClean");
+    WaitProcedure waitProcedure = new WaitProcedure();
+    long waitProce = procExecutor.submitProcedure(waitProcedure);
+    LOG.info("Begin to execute " + waitProce);
+    Thread.sleep(500);
+    LOG.info("begin to restart1 ");
+    ProcedureTestingUtility.restart(procExecutor, true);
+    LOG.info("finish to restart1 ");
+    while(procExecutor.getProcedure(waitProce) == null) {
+      Thread.sleep(100);
+    }
+    // Wait until the suspendProc executed after restart
+    waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce);
+    waitProcedure.latch.countDown();
+    Thread.sleep(500);
+    // Should be 1 log since the suspendProcedure is updated in the new log
+    Assert.assertTrue(procStore.getActiveLogs().size() == 1);
+    // restart procExecutor
+    LOG.info("begin to restart2");
+    // Restart the executor but do not start the workers.
+    // Otherwise, the suspendProcedure will soon be executed and the oldest log
+    // will be cleaned, leaving only the newest log.
+    ProcedureTestingUtility.restart(procExecutor, true, false);
+    LOG.info("finish to restart2");
+    // There should be two active logs
+    Assert.assertTrue(procStore.getActiveLogs().size() == 2);
+    procExecutor.startWorkers();
+  }
+
+  public static class WaitProcedure
+      extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
+    public WaitProcedure() {
+      super();
+    }
+
+    private CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    protected Procedure[] execute(final TestProcEnv env)
+        throws ProcedureSuspendedException {
+      // Always wait here
+      LOG.info("wait here");
+      try {
+        latch.await();
+      } catch (Throwable t) {
+
+      }
+      LOG.info("finished");
+      return null;
+    }
+  }
+
+  public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv>
{
+    public SuspendProcedure() {
+      super();
+    }
+
+    private CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    protected Procedure[] execute(final TestProcEnv env)
+        throws ProcedureSuspendedException {
+      // Always suspend the procedure
+      LOG.info("suspend here");
+      latch.countDown();
+      throw new ProcedureSuspendedException();
+    }
+  }
+
+  public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv>
{
+    private boolean childSpwaned = false;
+
+    public RootProcedure() {
+      super();
+    }
+
+    @Override
+    protected Procedure[] execute(final TestProcEnv env)
+        throws ProcedureSuspendedException {
+      if (!childSpwaned) {
+        childSpwaned = true;
+        return new Procedure[] {new SuspendProcedure()};
+      } else {
+        return null;
+      }
+    }
+  }
+
+
+}


Mime
View raw message