hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1445818 [2/3] - in /hbase/branches/hbase-7290/hbase-server/src: main/java/org/apache/hadoop/hbase/procedure/ main/java/org/apache/hadoop/hbase/zookeeper/ test/java/org/apache/hadoop/hbase/procedure/
Date Wed, 13 Feb 2013 18:39:52 GMT
Added: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is a shared ZooKeeper-based znode management utils for distributed procedure.  All znode
+ * operations should go through the provided methods in coordinators and members.
+ *
+ * Layout of nodes in ZK is
+ * /hbase/[op name]/acquired/
+ *                    [op instance] - op data/
+ *                        /[nodes that have acquired]
+ *                 /reached/
+ *                    [op instance]/
+ *                        /[nodes that have completed]
+ *                 /abort/
+ *                    [op instance] - failure data
+ *
+ * NOTE: while acquired and completed are znode dirs, abort is actually just a znode.
+ *
+ * Assumption here that procedure names are unique
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class ZKProcedureUtil
+    extends ZooKeeperListener implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
+
+  public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
+  public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
+  public static final String ABORT_ZNODE_DEFAULT = "abort";
+
+  public final String baseZNode;
+  protected final String acquiredZnode;
+  protected final String reachedZnode;
+  protected final String abortZnode;
+
+  protected final String memberName;
+
+  /**
+   * Top-level watcher/controller for procedures across the cluster.
+   * <p>
+   * On instantiation, this ensures the procedure znodes exists.  This however requires calling
+   * {@link #start} to start monitoring for running procedures.
+   * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
+   *          {@link #close()}
+   * @param procDescription name of the znode describing the procedure to run
+   * @param memberName name of the member from which we are interacting with running procedures
+   * @throws KeeperException when the procedure znodes cannot be created
+   */
+  public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription,
+      String memberName) throws KeeperException {
+    super(watcher);
+    this.memberName = memberName;
+    // make sure we are listening for events
+    watcher.registerListener(this);
+    // setup paths for the zknodes used in procedures
+    this.baseZNode = ZKUtil.joinZNode(watcher.baseZNode, procDescription);
+    acquiredZnode = ZKUtil.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
+    reachedZnode = ZKUtil.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
+    abortZnode = ZKUtil.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
+
+    // first make sure all the ZK nodes exist
+    // make sure all the parents exist (sometimes not the case in tests)
+    ZKUtil.createWithParents(watcher, acquiredZnode);
+    // regular create because all the parents exist
+    ZKUtil.createAndFailSilent(watcher, reachedZnode);
+    ZKUtil.createAndFailSilent(watcher, abortZnode);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (watcher != null) {
+      watcher.close();
+    }
+  }
+
+  public String getAcquiredBarrierNode(String opInstanceName) {
+    return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
+  }
+
+  public String getReachedBarrierNode(String opInstanceName) {
+    return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
+  }
+
+  public String getAbortZNode(String opInstanceName) {
+    return ZKProcedureUtil.getAbortNode(this, opInstanceName);
+  }
+
+  public String getAbortZnode() {
+    return abortZnode;
+  }
+
+  public String getBaseZnode() {
+    return baseZNode;
+  }
+
+  public String getAcquiredBarrier() {
+    return acquiredZnode;
+  }
+
+  public String getMemberName() {
+    return memberName;
+  }
+
+  /**
+   * Get the full znode path for the node used by the coordinator to trigger a global barrier
+   * acquire on each subprocedure.
+   * @param controller controller running the procedure
+   * @param opInstanceName name of the running procedure instance (not the procedure description).
+   * @return full znode path to the prepare barrier/start node
+   */
+  public static String getAcquireBarrierNode(ZKProcedureUtil controller,
+      String opInstanceName) {
+    return ZKUtil.joinZNode(controller.acquiredZnode, opInstanceName);
+  }
+
+  /**
+   * Get the full znode path for the node used by the coordinator to trigger a global barrier
+   * execution and release on each subprocedure.
+   * @param controller controller running the procedure
+   * @param opInstanceName name of the running procedure instance (not the procedure description).
+   * @return full znode path to the commit barrier
+   */
+  public static String getReachedBarrierNode(ZKProcedureUtil controller,
+      String opInstanceName) {
+    return ZKUtil.joinZNode(controller.reachedZnode, opInstanceName);
+  }
+
+  /**
+   * Get the full znode path for the node used by the coordinator or member to trigger an abort
+   * of the global barrier acquisition or execution in subprocedures.
+   * @param controller controller running the procedure
+   * @param opInstanceName name of the running procedure instance (not the procedure description).
+   * @return full znode path to the abort znode
+   */
+  public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
+    return ZKUtil.joinZNode(controller.abortZnode, opInstanceName);
+  }
+
+  public ZooKeeperWatcher getWatcher() {
+    return watcher;
+  }
+
+  /**
+   * Is this a procedure related znode path?
+   *
+   * TODO: this is not strict, can return true if had name just starts with same prefix but is
+   * different zdir.
+   *
+   * @return true if starts with baseZnode
+   */
+  public boolean isInProcedurePath(String path) {
+    return path.startsWith(baseZNode);
+  }
+
+  /**
+   * Is this in the procedure barrier acquired znode path
+   */
+  public boolean isAcquiredPathNode(String path) {
+    return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode);
+  }
+
+  /**
+   * Is this in the procedure barrier reached znode path
+   */
+  public boolean isReachedPathNode(String path) {
+    return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode);
+  }
+
+  /**
+   * Is this in the procedure barrier abort znode path
+   */
+    public boolean isAbortPathNode(String path) {
+    return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
+  }
+
+  // --------------------------------------------------------------------------
+  // internal debugging methods
+  // --------------------------------------------------------------------------
+  /**
+   * Recursively print the current state of ZK (non-transactional)
+   * @param root name of the root directory in zk to print
+   * @throws KeeperException
+   */
+  public void logZKTree(String root) {
+    if (!LOG.isDebugEnabled()) return;
+    LOG.debug("Current zk system:");
+    String prefix = "|-";
+    LOG.debug(prefix + root);
+    try {
+      logZKTree(root, prefix);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Helper method to print the current state of the ZK tree.
+   * @see #logZKTree(String)
+   * @throws KeeperException if an unexpected exception occurs
+   */
+  protected void logZKTree(String root, String prefix) throws KeeperException {
+    List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
+    if (children == null) return;
+    for (String child : children) {
+      LOG.debug(prefix + child);
+      String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
+      logZKTree(node, prefix + "---");
+    }
+  }
+
+  public void clearChildZNodes() throws KeeperException {
+    // TODO This is potentially racy since not atomic. update when we support zk that has multi
+
+    // If the coordinator was shutdown mid-procedure, then we are going to lose
+    // an procedure that was previously started by cleaning out all the previous state. Its much
+    // harder to figure out how to keep an procedure going and the subject of HBASE-5487.
+    ZKUtil.deleteChildrenRecursively(watcher, acquiredZnode);
+    ZKUtil.deleteChildrenRecursively(watcher, reachedZnode);
+    ZKUtil.deleteChildrenRecursively(watcher, abortZnode);
+  }
+
+  public void clearZNodes(String procedureName) throws KeeperException {
+    // TODO This is potentially racy since not atomic. update when we support zk that has multi
+
+    ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
+    ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
+    ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
+  }
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1445818&r1=1445817&r2=1445818&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Wed Feb 13 18:39:51 2013
@@ -23,21 +23,16 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
-import java.net.InetAddress;
 import java.net.Socket;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
-import javax.security.auth.login.LoginException;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
 
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,6 +44,8 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -56,9 +53,9 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.server.ZooKeeperSaslServer;
 
 /**
@@ -849,6 +846,10 @@ public class ZKUtil {
   /**
    * Set data into node creating node if it doesn't yet exist.
    * Does not set watch.
+   *
+   * WARNING: this is not atomic -- it is possible to get a 0-byte data value in the znode before
+   * data is written
+   *
    * @param zkw zk reference
    * @param znode path of node
    * @param data data to set for node
@@ -1070,7 +1071,7 @@ public class ZKUtil {
   }
 
   /**
-   * Creates the specified node, if the node does not exist.  Does not set a
+   * Creates the specified node, iff the node does not exist.  Does not set a
    * watch and fails silently if the node already exists.
    *
    * The node created is persistent and open access.
@@ -1082,11 +1083,28 @@ public class ZKUtil {
   public static void createAndFailSilent(ZooKeeperWatcher zkw,
       String znode)
   throws KeeperException {
+    createAndFailSilent(zkw, znode, new byte[0]);
+  }
+
+  /**
+   * Creates the specified node containing specified data, iff the node does not exist.  Does
+   * not set a watch and fails silently if the node already exists.
+   *
+   * The node created is persistent and open access.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @param data a byte array data to store in the znode
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static void createAndFailSilent(ZooKeeperWatcher zkw,
+      String znode, byte[] data)
+  throws KeeperException {
     try {
       RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
       waitForZKConnectionIfAuthenticating(zkw);
       if (zk.exists(znode, false) == null) {
-        zk.create(znode, new byte[0], createACL(zkw,znode),
+        zk.create(znode, data, createACL(zkw,znode),
             CreateMode.PERSISTENT);
       }
     } catch(KeeperException.NodeExistsException nee) {
@@ -1118,12 +1136,30 @@ public class ZKUtil {
    */
   public static void createWithParents(ZooKeeperWatcher zkw, String znode)
   throws KeeperException {
+    createWithParents(zkw, znode, new byte[0]);
+  }
+
+  /**
+   * Creates the specified node and all parent nodes required for it to exist.  The creation of
+   * parent znodes is not atomic with the leafe znode creation but the data is written atomically
+   * when the leaf node is created.
+   *
+   * No watches are set and no errors are thrown if the node already exists.
+   *
+   * The nodes created are persistent and open access.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static void createWithParents(ZooKeeperWatcher zkw, String znode, byte[] data)
+  throws KeeperException {
     try {
       if(znode == null) {
         return;
       }
       waitForZKConnectionIfAuthenticating(zkw);
-      zkw.getRecoverableZooKeeper().create(znode, new byte[0], createACL(zkw, znode),
+      zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
           CreateMode.PERSISTENT);
     } catch(KeeperException.NodeExistsException nee) {
       return;
@@ -1422,4 +1458,37 @@ public class ZKUtil {
     ke.initCause(e);
     return ke;
   }
+
+  /**
+   * Recursively print the current state of ZK (non-transactional)
+   * @param root name of the root directory in zk to print
+   * @throws KeeperException
+   */
+  public static void logZKTree(ZooKeeperWatcher zkw, String root) {
+    if (!LOG.isDebugEnabled()) return;
+    LOG.debug("Current zk system:");
+    String prefix = "|-";
+    LOG.debug(prefix + root);
+    try {
+      logZKTree(zkw, root, prefix);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Helper method to print the current state of the ZK tree.
+   * @see #logZKTree(String)
+   * @throws KeeperException if an unexpected exception occurs
+   */
+  protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix) throws KeeperException {
+    List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);
+    if (children == null) return;
+    for (String child : children) {
+      LOG.debug(prefix + child);
+      String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
+      logZKTree(zkw, node, prefix + "---");
+    }
+  }
+
 }

Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Demonstrate how Procedure handles single members, multiple members, and errors semantics
+ */
+@Category(SmallTests.class)
+public class TestProcedure {
+
+  ProcedureCoordinator coord;
+
+  @Before
+  public void setup() {
+    coord = mock(ProcedureCoordinator.class);
+    final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
+    when(coord.getRpcs()).thenReturn(comms); // make it not null
+  }
+
+  class LatchedProcedure extends Procedure {
+    CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
+    CountDownLatch startedDuringBarrier = new CountDownLatch(1);
+    CountDownLatch completedProcedure = new CountDownLatch(1);
+
+    public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
+        long wakeFreq, long timeout, String opName, byte[] data,
+        List<String> expectedMembers) {
+      super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
+    }
+
+    @Override
+    public void sendGlobalBarrierStart() {
+      startedAcquireBarrier.countDown();
+    }
+
+    @Override
+    public void sendGlobalBarrierReached() {
+      startedDuringBarrier.countDown();
+    }
+
+    @Override
+    public void sendGlobalBarrierComplete() {
+      completedProcedure.countDown();
+    }
+  };
+
+  /**
+   * With a single member, verify ordered execution.  The Coordinator side is run in a separate
+   * thread so we can only trigger from members and wait for particular state latches.
+   */
+  @Test(timeout = 1000)
+  public void testSingleMember() throws Exception {
+    // The member
+    List<String> members =  new ArrayList<String>();
+    members.add("member");
+    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+        Integer.MAX_VALUE, "op", null, members);
+    final LatchedProcedure procspy = spy(proc);
+    // coordinator: start the barrier procedure
+    new Thread() {
+      public void run() {
+        procspy.call();
+      }
+    }.start();
+
+    // coordinator: wait for the barrier to be acquired, then send start barrier
+    proc.startedAcquireBarrier.await();
+
+    // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
+    verify(procspy).sendGlobalBarrierStart();
+    verify(procspy, never()).sendGlobalBarrierReached();
+    verify(procspy, never()).sendGlobalBarrierComplete();
+    verify(procspy, never()).barrierAcquiredByMember(anyString());
+
+    // member: trigger global barrier acquisition
+    proc.barrierAcquiredByMember(members.get(0));
+
+    // coordinator: wait for global barrier to be acquired.
+    proc.acquiredBarrierLatch.await();
+    verify(procspy).sendGlobalBarrierStart(); // old news
+
+    // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
+    // or was not called here.
+
+    // member: trigger global barrier release
+    proc.barrierReleasedByMember(members.get(0));
+
+    // coordinator: wait for procedure to be completed
+    proc.completedProcedure.await();
+    verify(procspy).sendGlobalBarrierReached();
+    verify(procspy).sendGlobalBarrierComplete();
+    verify(procspy, never()).receive(any(ForeignException.class));
+  }
+
+  @Test(timeout=1000)
+  public void testMultipleMember() throws Exception {
+    // 2 members
+    List<String> members =  new ArrayList<String>();
+    members.add("member1");
+    members.add("member2");
+
+    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+        Integer.MAX_VALUE, "op", null, members);
+    final LatchedProcedure procspy = spy(proc);
+    // start the barrier procedure
+    new Thread() {
+      public void run() {
+        procspy.call();
+      }
+    }.start();
+
+    // coordinator: wait for the barrier to be acquired, then send start barrier
+    procspy.startedAcquireBarrier.await();
+
+    // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
+    verify(procspy).sendGlobalBarrierStart();
+    verify(procspy, never()).sendGlobalBarrierReached();
+    verify(procspy, never()).sendGlobalBarrierComplete();
+    verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
+
+    // member0: [1/2] trigger global barrier acquisition.
+    procspy.barrierAcquiredByMember(members.get(0));
+
+    // coordinator not satisified.
+    verify(procspy).sendGlobalBarrierStart();
+    verify(procspy, never()).sendGlobalBarrierReached();
+    verify(procspy, never()).sendGlobalBarrierComplete();
+
+    // member 1: [2/2] trigger global barrier acquisition.
+    procspy.barrierAcquiredByMember(members.get(1));
+
+    // coordinator: wait for global barrier to be acquired.
+    procspy.startedDuringBarrier.await();
+    verify(procspy).sendGlobalBarrierStart(); // old news
+
+    // member 1, 2: trigger global barrier release
+    procspy.barrierReleasedByMember(members.get(0));
+    procspy.barrierReleasedByMember(members.get(1));
+
+    // coordinator wait for procedure to be completed
+    procspy.completedProcedure.await();
+    verify(procspy).sendGlobalBarrierReached();
+    verify(procspy).sendGlobalBarrierComplete();
+    verify(procspy, never()).receive(any(ForeignException.class));
+  }
+
+  @Test(timeout = 1000)
+  public void testErrorPropagation() throws Exception {
+    List<String> members =  new ArrayList<String>();
+    members.add("member");
+    Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100,
+        Integer.MAX_VALUE, "op", null, members);
+    final Procedure procspy = spy(proc);
+
+    ForeignException cause = new ForeignException("SRC", "External Exception");
+    proc.receive(cause);
+
+    // start the barrier procedure
+    Thread t = new Thread() {
+      public void run() {
+        procspy.call();
+      }
+    };
+    t.start();
+    t.join();
+
+    verify(procspy, never()).sendGlobalBarrierStart();
+    verify(procspy, never()).sendGlobalBarrierReached();
+    verify(procspy).sendGlobalBarrierComplete();
+  }
+
+  @Test(timeout = 1000)
+  public void testBarrieredErrorPropagation() throws Exception {
+    List<String> members =  new ArrayList<String>();
+    members.add("member");
+    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
+        Integer.MAX_VALUE, "op", null, members);
+    final LatchedProcedure procspy = spy(proc);
+
+    // start the barrier procedure
+    Thread t = new Thread() {
+      public void run() {
+        procspy.call();
+      }
+    };
+    t.start();
+
+    // now test that we can put an error in before the commit phase runs
+    procspy.startedAcquireBarrier.await();
+    ForeignException cause = new ForeignException("SRC", "External Exception");
+    procspy.receive(cause);
+    procspy.barrierAcquiredByMember(members.get(0));
+    t.join();
+
+    // verify state of all the object
+    verify(procspy).sendGlobalBarrierStart();
+    verify(procspy).sendGlobalBarrierComplete();
+    verify(procspy, never()).sendGlobalBarrierReached();
+  }
+}
\ No newline at end of file

Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test Procedure coordinator operation.
+ * <p>
+ * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
+ * level serialization this class will likely throw all kinds of errors.
+ */
+@Category(SmallTests.class)
+public class TestProcedureCoordinator {
+  // general test constants
+  private static final long WAKE_FREQUENCY = 1000;
+  private static final long TIMEOUT = 100000;
+  private static final long POOL_KEEP_ALIVE = 1;
+  private static final String nodeName = "node";
+  private static final String procName = "some op";
+  private static final byte[] procData = new byte[0];
+  private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
+
+  // setup the mocks
+  private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
+  private final Procedure task = mock(Procedure.class);
+  private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
+
+  // handle to the coordinator for each test
+  private ProcedureCoordinator coordinator;
+
+  @After
+  public void resetTest() throws IOException {
+    // reset all the mocks used for the tests
+    reset(controller, task, monitor);
+    // close the open coordinator, if it was used
+    if (coordinator != null) coordinator.close();
+  }
+
+  private ProcedureCoordinator buildNewCoordinator() {
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY);
+    return spy(new ProcedureCoordinator(controller, pool));
+  }
+
+  /**
+   * Currently we can only handle one procedure at a time.  This makes sure we handle that and
+   * reject submitting more.
+   */
+  @Test
+  public void testThreadPoolSize() throws Exception {
+    ProcedureCoordinator coordinator = buildNewCoordinator();
+    Procedure proc = new Procedure(coordinator,  monitor,
+        WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
+    Procedure procSpy = spy(proc);
+
+    Procedure proc2 = new Procedure(coordinator,  monitor,
+        WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected);
+    Procedure procSpy2 = spy(proc2);
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+    .thenReturn(procSpy, procSpy2);
+
+    coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
+    // null here means second procedure failed to start.
+    assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
+      coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
+  }
+
+  /**
+   * Check handling a connection failure correctly if we get it during the acquiring phase
+   */
+  @Test(timeout = 5000)
+  public void testUnreachableControllerDuringPrepare() throws Exception {
+    coordinator = buildNewCoordinator();
+    // setup the proc
+    List<String> expected = Arrays.asList("cohort");
+    Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY,
+        TIMEOUT, procName, procData, expected);
+    final Procedure procSpy = spy(proc);
+
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+        .thenReturn(procSpy);
+
+    // use the passed controller responses
+    IOException cause = new IOException("Failed to reach comms during acquire");
+    doThrow(cause).when(controller)
+        .sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class));
+
+    // run the operation
+    proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
+    // and wait for it to finish
+    proc.waitForCompleted();
+    verify(procSpy, atLeastOnce()).receive(any(ForeignException.class));
+    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
+    verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
+    verify(controller, never()).sendGlobalBarrierReached(any(Procedure.class),
+        anyListOf(String.class));
+  }
+
+  /**
+   * Check handling a connection failure correctly if we get it during the barrier phase
+   */
+  @Test(timeout = 5000)
+  public void testUnreachableControllerDuringCommit() throws Exception {
+    coordinator = buildNewCoordinator();
+
+    // setup the task and spy on it
+    List<String> expected = Arrays.asList("cohort");
+    final Procedure spy = spy(new Procedure(coordinator,
+        WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
+
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+    .thenReturn(spy);
+
+    // use the passed controller responses
+    IOException cause = new IOException("Failed to reach controller during prepare");
+    doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" }))
+        .when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
+    doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
+
+    // run the operation
+    Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
+    // and wait for it to finish
+    task.waitForCompleted();
+    verify(spy, atLeastOnce()).receive(any(ForeignException.class));
+    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
+    verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy),
+        eq(procData), anyListOf(String.class));
+    verify(controller, times(1)).sendGlobalBarrierReached(any(Procedure.class),
+        anyListOf(String.class));
+  }
+
+  @Test(timeout = 1000)
+  public void testNoCohort() throws Exception {
+    runSimpleProcedure();
+  }
+
+  @Test(timeout = 1000)
+  public void testSingleCohortOrchestration() throws Exception {
+    runSimpleProcedure("one");
+  }
+
+  @Test(timeout = 1000)
+  public void testMultipleCohortOrchestration() throws Exception {
+    runSimpleProcedure("one", "two", "three", "four");
+  }
+
+  public void runSimpleProcedure(String... members) throws Exception {
+    coordinator = buildNewCoordinator();
+    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
+        TIMEOUT, procName, procData, Arrays.asList(members));
+    final Procedure spy = spy(task);
+    runCoordinatedProcedure(spy, members);
+  }
+
+  /**
+   * Test that if nodes join the barrier early we still correctly handle the progress
+   */
+  @Test(timeout = 1000)
+  public void testEarlyJoiningBarrier() throws Exception {
+    final String[] cohort = new String[] { "one", "two", "three", "four" };
+    coordinator = buildNewCoordinator();
+    final ProcedureCoordinator ref = coordinator;
+    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
+        TIMEOUT, procName, procData, Arrays.asList(cohort));
+    final Procedure spy = spy(task);
+
+    AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
+      public void doWork() {
+        // then do some fun where we commit before all nodes have prepared
+        // "one" commits before anyone else is done
+        ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
+        ref.memberFinishedBarrier(this.opName, this.cohort[0]);
+        // but "two" takes a while
+        ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
+        // "three"jumps ahead
+        ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
+        ref.memberFinishedBarrier(this.opName, this.cohort[2]);
+        // and "four" takes a while
+        ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
+      }
+    };
+
+    BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
+      @Override
+      public void doWork() {
+        ref.memberFinishedBarrier(opName, this.cohort[1]);
+        ref.memberFinishedBarrier(opName, this.cohort[3]);
+      }
+    };
+    runCoordinatedOperation(spy, prepare, commit, cohort);
+  }
+
+  /**
+   * Just run a procedure with the standard name and data, with not special task for the mock
+   * coordinator (it works just like a regular coordinator). For custom behavior see
+   * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])}
+   * .
+   * @param spy Spy on a real {@link Procedure}
+   * @param cohort expected cohort members
+   * @throws Exception on failure
+   */
+  public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
+    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
+      new BarrierAnswer(procName, cohort), cohort);
+  }
+
+  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare,
+      String... cohort) throws Exception {
+    runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
+  }
+
+  public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit,
+      String... cohort) throws Exception {
+    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
+  }
+
+  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
+      BarrierAnswer commitOperation, String... cohort) throws Exception {
+    List<String> expected = Arrays.asList(cohort);
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
+      .thenReturn(spy);
+
+    // use the passed controller responses
+    doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
+    doAnswer(commitOperation).when(controller)
+        .sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
+
+    // run the operation
+    Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
+    // and wait for it to finish
+    task.waitForCompleted();
+
+    // make sure we mocked correctly
+    prepareOperation.ensureRan();
+    // we never got an exception
+    InOrder inorder = inOrder(spy, controller);
+    inorder.verify(spy).sendGlobalBarrierStart();
+    inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
+    inorder.verify(spy).sendGlobalBarrierReached();
+    inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
+  }
+
+  private abstract class OperationAnswer implements Answer<Void> {
+    private boolean ran = false;
+
+    public void ensureRan() {
+      assertTrue("Prepare mocking didn't actually run!", ran);
+    }
+
+    @Override
+    public final Void answer(InvocationOnMock invocation) throws Throwable {
+      this.ran = true;
+      doWork();
+      return null;
+    }
+
+    protected abstract void doWork() throws Throwable;
+  }
+
+  /**
+   * Just tell the current coordinator that each of the nodes has prepared
+   */
+  private class AcquireBarrierAnswer extends OperationAnswer {
+    protected final String[] cohort;
+    protected final String opName;
+
+    public AcquireBarrierAnswer(String opName, String... cohort) {
+      this.cohort = cohort;
+      this.opName = opName;
+    }
+
+    @Override
+    public void doWork() {
+      if (cohort == null) return;
+      for (String member : cohort) {
+        TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
+      }
+    }
+  }
+
+  /**
+   * Just tell the current coordinator that each of the nodes has committed
+   */
+  private class BarrierAnswer extends OperationAnswer {
+    protected final String[] cohort;
+    protected final String opName;
+
+    public BarrierAnswer(String opName, String... cohort) {
+      this.cohort = cohort;
+      this.opName = opName;
+    }
+
+    @Override
+    public void doWork() {
+      if (cohort == null) return;
+      for (String member : cohort) {
+        TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member);
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.TimeoutException;
+import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the procedure member, and it's error handling mechanisms.
+ */
+@Category(SmallTests.class)
+public class TestProcedureMember {
+  private static final long WAKE_FREQUENCY = 100;
+  private static final long TIMEOUT = 100000;
+  private static final long POOL_KEEP_ALIVE = 1;
+
+  private final String op = "some op";
+  private final byte[] data = new byte[0];
+  private final ForeignExceptionDispatcher mockListener = Mockito
+      .spy(new ForeignExceptionDispatcher());
+  private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
+  private final ProcedureMemberRpcs mockMemberComms = Mockito
+      .mock(ProcedureMemberRpcs.class);
+  private ProcedureMember member;
+  private ForeignExceptionDispatcher dispatcher;
+  Subprocedure spySub;
+
+  /**
+   * Reset all the mock objects
+   */
+  @After
+  public void resetTest() {
+    reset(mockListener, mockBuilder, mockMemberComms);
+    if (member != null)
+      try {
+        member.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+  }
+
+  /**
+   * Build a member using the class level mocks
+   * @return member to use for tests
+   */
+  private ProcedureMember buildCohortMember() {
+    String name = "node";
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
+    return new ProcedureMember(mockMemberComms, pool, mockBuilder);
+  }
+
+  /**
+   * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
+   */
+  private void buildCohortMemberPair() throws IOException {
+    dispatcher = new ForeignExceptionDispatcher();
+    String name = "node";
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
+    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
+    when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
+    Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
+    spySub = spy(subproc);
+    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
+    addCommitAnswer();
+  }
+
+
+  /**
+   * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
+   */
+  private void addCommitAnswer() throws IOException {
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        member.receivedReachedGlobalBarrier(op);
+        return null;
+      }
+    }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
+  }
+
+  /**
+   * Test the normal sub procedure execution case.
+   */
+  @Test(timeout = 500)
+  public void testSimpleRun() throws Exception {
+    member = buildCohortMember();
+    EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
+    EmptySubprocedure spy = spy(subproc);
+    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
+
+    // when we get a prepare, then start the commit phase
+    addCommitAnswer();
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc1 = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc1);
+    // and wait for it to finish
+    subproc.waitForLocallyCompleted();
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spy);
+    order.verify(spy).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
+    order.verify(spy).insideBarrier();
+    order.verify(mockMemberComms).sendMemberCompleted(eq(spy));
+    order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
+        any(ForeignException.class));
+  }
+
+  /**
+   * Make sure we call cleanup etc, when we have an exception during
+   * {@link Subprocedure#acquireBarrier()}.
+   */
+  @Test(timeout = 1000)
+  public void testMemberPrepareException() throws Exception {
+    buildCohortMemberPair();
+
+    // mock an exception on Subprocedure's prepare
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            throw new IOException("Forced IOException in member acquireBarrier");
+          }
+        }).when(spySub).acquireBarrier();
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    // Later phases not run
+    order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
+    order.verify(spySub, never()).insideBarrier();
+    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Make sure we call cleanup etc, when we have an exception during prepare.
+   */
+  @Test(timeout = 1000)
+  public void testSendMemberAcquiredCommsFailure() throws Exception {
+    buildCohortMemberPair();
+
+    // mock an exception on Subprocedure's prepare
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            throw new IOException("Forced IOException in memeber prepare");
+          }
+        }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+
+    // Later phases not run
+    order.verify(spySub, never()).insideBarrier();
+    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Fail correctly if coordinator aborts the procedure.  The subprocedure will not interrupt a
+   * running {@link Subprocedure#prepare} -- prepare needs to finish first, and the the abort
+   * is checked.  Thus, the {@link Subprocedure#prepare} should succeed but later get rolled back
+   * via {@link Subprocedure#cleanup}.
+   */
+  @Test(timeout = 1000)
+  public void testCoordinatorAbort() throws Exception {
+    buildCohortMemberPair();
+
+    // mock that another node timed out or failed to prepare
+    final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            // inject a remote error (this would have come from an external thread)
+            spySub.cancel("bogus message", oate);
+            // sleep the wake frequency since that is what we promised
+            Thread.sleep(WAKE_FREQUENCY);
+            return null;
+          }
+        }).when(spySub).waitForReachedGlobalBarrier();
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+    // Later phases not run
+    order.verify(spySub, never()).insideBarrier();
+    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Handle failures if a member's commit phase fails.
+   *
+   * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
+   * 2PC the transaction is committed just before the coordinator sends commit messages to the
+   * member.  Members are then responsible for reading its TX log.  This implementation actually
+   * rolls back, and thus breaks the normal TX guarantees.
+  */
+  @Test(timeout = 1000)
+  public void testMemberCommitException() throws Exception {
+    buildCohortMemberPair();
+
+    // mock an exception on Subprocedure's prepare
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            throw new IOException("Forced IOException in memeber prepare");
+          }
+        }).when(spySub).insideBarrier();
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+    order.verify(spySub).insideBarrier();
+
+    // Later phases not run
+    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
+   *
+   * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
+   * 2PC the transaction is committed just before the coordinator sends commit messages to the
+   * member.  Members are then responsible for reading its TX log.  This implementation actually
+   * rolls back, and thus breaks the normal TX guarantees.
+  */
+  @Test(timeout = 1000)
+  public void testMemberCommitCommsFailure() throws Exception {
+    buildCohortMemberPair();
+    final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            // inject a remote error (this would have come from an external thread)
+            spySub.cancel("commit comms fail", oate);
+            // sleep the wake frequency since that is what we promised
+            Thread.sleep(WAKE_FREQUENCY);
+            return null;
+          }
+        }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class));
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    member.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spySub);
+    order.verify(spySub).acquireBarrier();
+    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
+    order.verify(spySub).insideBarrier();
+    order.verify(mockMemberComms).sendMemberCompleted(eq(spySub));
+    // error recovery path exercised
+    order.verify(spySub).cancel(anyString(), any(Exception.class));
+    order.verify(spySub).cleanup(any(Exception.class));
+  }
+
+  /**
+   * Fail correctly on getting an external error while waiting for the prepared latch
+   * @throws Exception on failure
+   */
+  @Test(timeout = 1000)
+  public void testPropagateConnectionErrorBackToManager() throws Exception {
+    // setup the operation
+    member = buildCohortMember();
+    ProcedureMember memberSpy = spy(member);
+
+    // setup the commit and the spy
+    final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
+    ForeignExceptionDispatcher dispSpy = spy(dispatcher);
+    Subprocedure commit = new EmptySubprocedure(member, dispatcher);
+    Subprocedure spy = spy(commit);
+    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
+
+    // fail during the prepare phase
+    doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
+    // and throw a connection error when we try to tell the controller about it
+    doThrow(new IOException("Controller is down!")).when(mockMemberComms)
+        .sendMemberAborted(eq(spy), any(ForeignException.class));
+
+
+    // run the operation
+    // build a new operation
+    Subprocedure subproc = memberSpy.createSubprocedure(op, data);
+    memberSpy.submitSubprocedure(subproc);
+    // if the operation doesn't die properly, then this will timeout
+    memberSpy.closeAndWait(TIMEOUT);
+
+    // make sure everything ran in order
+    InOrder order = inOrder(mockMemberComms, spy, dispSpy);
+    // make sure we acquire.
+    order.verify(spy).acquireBarrier();
+    order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
+
+    // TODO Need to do another refactor to get this to propagate to the coordinator.
+    // make sure we pass a remote exception back the controller
+//    order.verify(mockMemberComms).sendMemberAborted(eq(spy),
+//      any(ExternalException.class));
+//    order.verify(dispSpy).receiveError(anyString(),
+//        any(ExternalException.class), any());
+  }
+
+  /**
+   * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
+   * correctly build a new task for the requested operation
+   * @throws Exception on failure
+   */
+  @Test
+  public void testNoTaskToBeRunFromRequest() throws Exception {
+    ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
+    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
+      .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
+    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
+    // builder returns null
+    // build a new operation
+    Subprocedure subproc = member.createSubprocedure(op, data);
+    member.submitSubprocedure(subproc);
+    // throws an illegal state exception
+    try {
+      // build a new operation
+      Subprocedure subproc2 = member.createSubprocedure(op, data);
+      member.submitSubprocedure(subproc2);
+    } catch (IllegalStateException ise) {
+    }
+    // throws an illegal argument exception
+    try {
+      // build a new operation
+      Subprocedure subproc3 = member.createSubprocedure(op, data);
+      member.submitSubprocedure(subproc3);
+    } catch (IllegalArgumentException iae) {
+    }
+
+    // no request should reach the pool
+    verifyZeroInteractions(pool);
+    // get two abort requests
+    // TODO Need to do another refactor to get this to propagate to the coordinator.
+    // verify(mockMemberComms, times(2)).sendMemberAborted(any(Subprocedure.class), any(ExternalException.class));
+  }
+
+  /**
+   * Helper {@link Procedure} who's phase for each step is just empty
+   */
+  public class EmptySubprocedure extends SubprocedureImpl {
+    public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
+      super( member, op, dispatcher,
+      // TODO 1000000 is an arbitrary number that I picked.
+          WAKE_FREQUENCY, TIMEOUT);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1445818&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java Wed Feb 13 18:39:51 2013
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.TimeoutException;
+import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.internal.matchers.ArrayEquals;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
+ */
+@Category(MediumTests.class)
+public class TestZKProcedure {
+
+  private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final String COORDINATOR_NODE_NAME = "coordinator";
+  private static final long KEEP_ALIVE = 100; // seconds
+  private static final int POOL_SIZE = 1;
+  private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
+  private static final long WAKE_FREQUENCY = 500;
+  private static final String opName = "op";
+  private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
+  private static final VerificationMode once = Mockito.times(1);
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    UTIL.startMiniZKCluster();
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
+    return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
+      @Override
+      public void abort(String why, Throwable e) {
+        throw new RuntimeException(
+            "Unexpected abort in distributed three phase commit test:" + why, e);
+      }
+
+      @Override
+      public boolean isAborted() {
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void testEmptyMemberSet() throws Exception {
+    runCommit();
+  }
+
+  @Test
+  public void testSingleMember() throws Exception {
+    runCommit("one");
+  }
+
+  @Test
+  public void testMultipleMembers() throws Exception {
+    runCommit("one", "two", "three", "four" );
+  }
+
+  private void runCommit(String... members) throws Exception {
+    // make sure we just have an empty list
+    if (members == null) {
+      members = new String[0];
+    }
+    List<String> expected = Arrays.asList(members);
+
+    // setup the constants
+    ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
+    String opDescription = "coordination test - " + members.length + " cohort members";
+
+    // start running the controller
+    ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
+        coordZkw, opDescription, COORDINATOR_NODE_NAME);
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
+    ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
+      @Override
+      public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
+          List<String> expectedMembers) {
+        return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
+      }
+    };
+
+    // build and start members
+    // NOTE: There is a single subprocedure builder for all members here.
+    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
+    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
+        members.length);
+    // start each member
+    for (String member : members) {
+      ZooKeeperWatcher watcher = newZooKeeperWatcher();
+      ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
+      ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
+      procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
+      comms.start(procMember);
+    }
+
+    // setup mock member subprocedures
+    final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
+    for (int i = 0; i < procMembers.size(); i++) {
+      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
+      Subprocedure commit = Mockito
+      .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
+          TIMEOUT, WAKE_FREQUENCY));
+      subprocs.add(commit);
+    }
+
+    // link subprocedure to buildNewOperation invocation.
+    final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
+    Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
+        (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
+      new Answer<Subprocedure>() {
+        @Override
+        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
+          int index = i.getAndIncrement();
+          LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
+          Subprocedure commit = subprocs.get(index);
+          return commit;
+        }
+      });
+
+    // setup spying on the coordinator
+//    Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
+//    Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
+
+    // start running the operation
+    Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
+//    assertEquals("Didn't mock coordinator task", proc, task);
+
+    // verify all things ran as expected
+//    waitAndVerifyProc(proc, once, once, never(), once, false);
+    waitAndVerifyProc(task, once, once, never(), once, false);
+    verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
+
+    // close all the things
+    closeAll(coordinator, coordinatorComms, procMembers);
+  }
+
+  /**
+   * Test a distributed commit with multiple cohort members, where one of the cohort members has a
+   * timeout exception during the prepare stage.
+   */
+  @Test
+  public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
+    String opDescription = "error injection coordination";
+    String[] cohortMembers = new String[] { "one", "two", "three" };
+    List<String> expected = Lists.newArrayList(cohortMembers);
+    // error constants
+    final int memberErrorIndex = 2;
+    final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
+
+    // start running the coordinator and its controller
+    ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
+    ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
+        coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
+    ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
+
+    // start a member for each node
+    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
+    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
+        expected.size());
+    for (String member : expected) {
+      ZooKeeperWatcher watcher = newZooKeeperWatcher();
+      ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
+      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
+      ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
+      members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
+      controller.start(mem);
+    }
+
+    // setup mock subprocedures
+    final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
+    final int[] elem = new int[1];
+    for (int i = 0; i < members.size(); i++) {
+      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
+      ProcedureMember comms = members.get(i).getFirst();
+      Subprocedure commit = Mockito
+      .spy(new SubprocedureImpl(comms, opName, cohortMonitor, TIMEOUT, WAKE_FREQUENCY));
+      // This nasty bit has one of the impls throw a TimeoutException
+      Mockito.doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          int index = elem[0];
+          if (index == memberErrorIndex) {
+            LOG.debug("Sending error to coordinator");
+            ForeignException remoteCause = new ForeignException("TIMER",
+                new TimeoutException("subprocTimeout" , 1, 2, 0));
+            Subprocedure r = ((Subprocedure) invocation.getMock());
+            LOG.error("Remote commit failure, not propagating error:" + remoteCause);
+            r.monitor.receive(remoteCause);
+            // don't complete the error phase until the coordinator has gotten the error
+            // notification (which ensures that we never progress past prepare)
+            try {
+              Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
+                  WAKE_FREQUENCY, "coordinator received error");
+            } catch (InterruptedException e) {
+              LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
+              // reset the interrupt status on the thread
+              Thread.currentThread().interrupt();
+            }
+          }
+          elem[0] = ++index;
+          return null;
+        }
+      }).when(commit).acquireBarrier();
+      cohortTasks.add(commit);
+    }
+
+    // pass out a task per member
+    final int[] i = new int[] { 0 };
+    Mockito.when(
+      subprocFactory.buildSubprocedure(Mockito.eq(opName),
+        (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
+      new Answer<Subprocedure>() {
+        @Override
+        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
+          int index = i[0];
+          Subprocedure commit = cohortTasks.get(index);
+          index++;
+          i[0] = index;
+          return commit;
+        }
+      });
+
+    // setup spying on the coordinator
+    ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
+        .spy(new ForeignExceptionDispatcher());
+    Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
+        coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
+        opName, data, expected));
+    when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
+      .thenReturn(coordinatorTask);
+    // count down the error latch when we get the remote error
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        // pass on the error to the master
+        invocation.callRealMethod();
+        // then count down the got error latch
+        coordinatorReceivedErrorLatch.countDown();
+        return null;
+      }
+    }).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
+
+    // ----------------------------
+    // start running the operation
+    // ----------------------------
+
+    Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
+    assertEquals("Didn't mock coordinator task", coordinatorTask, task);
+
+    // wait for the task to complete
+    try {
+      task.waitForCompleted();
+    } catch (ForeignException fe) {
+      // this may get caught or may not 
+    }
+
+    // -------------
+    // verification
+    // -------------
+    waitAndVerifyProc(coordinatorTask, once, never(), once, once, true);
+    verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
+      once, true);
+
+    // close all the open things
+    closeAll(coordinator, coordinatorController, members);
+  }
+
+  /**
+   * Wait for the coordinator task to complete, and verify all the mocks
+   * @param task to wait on
+   * @throws Exception on unexpected failure
+   */
+  private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
+      VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
+      throws Exception {
+    boolean caughtError = false;
+    try {
+      proc.waitForCompleted();
+    } catch (ForeignException fe) {
+      caughtError = true;
+    }
+    // make sure that the task called all the expected phases
+    Mockito.verify(proc, prepare).sendGlobalBarrierStart();
+    Mockito.verify(proc, commit).sendGlobalBarrierReached();
+    Mockito.verify(proc, finish).sendGlobalBarrierComplete();
+    assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
+        .hasException());
+    assertEquals("Operation error state was unexpected", opHasError, caughtError);
+
+  }
+
+  /**
+   * Wait for the coordinator task to complete, and verify all the mocks
+   * @param task to wait on
+   * @throws Exception on unexpected failure
+   */
+  private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
+      VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
+      throws Exception {
+    boolean caughtError = false;
+    try {
+      op.waitForLocallyCompleted();
+    } catch (ForeignException fe) {
+      caughtError = true;
+    }
+    // make sure that the task called all the expected phases
+    Mockito.verify(op, prepare).acquireBarrier();
+    Mockito.verify(op, commit).insideBarrier();
+    // We cannot guarantee that cleanup has run so we don't check it.
+
+    assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
+        .hasException());
+    assertEquals("Operation error state was unexpected", opHasError, caughtError);
+
+  }
+
+  private void verifyCohortSuccessful(List<String> cohortNames,
+      SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
+      VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
+      VerificationMode finish, boolean opHasError) throws Exception {
+
+    // make sure we build the correct number of cohort members
+    Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
+      Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
+    // verify that we ran each of the operations cleanly
+    int j = 0;
+    for (Subprocedure op : cohortTasks) {
+      LOG.debug("Checking mock:" + (j++));
+      waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
+    }
+  }
+
+  private void closeAll(
+      ProcedureCoordinator coordinator,
+      ZKProcedureCoordinatorRpcs coordinatorController,
+      List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
+      throws IOException {
+    // make sure we close all the resources
+    for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
+      member.getFirst().close();
+      member.getSecond().close();
+    }
+    coordinator.close();
+    coordinatorController.close();
+  }
+}



Mime
View raw message