Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 73702 invoked from network); 24 Jul 2008 11:06:55 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Jul 2008 11:06:55 -0000 Received: (qmail 14978 invoked by uid 500); 24 Jul 2008 11:06:55 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 14923 invoked by uid 500); 24 Jul 2008 11:06:55 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 14914 invoked by uid 99); 24 Jul 2008 11:06:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jul 2008 04:06:55 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jul 2008 11:06:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DF28A238889D; Thu, 24 Jul 2008 04:06:34 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r679354 - in /activemq/sandbox/zookeeper/zookeeper-protocols/src: main/java/org/apache/zookeeper/protocols/WriteLock.java main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java test/java/org/apache/zookeeper/protocols/WriteLockTest.java Date: Thu, 24 Jul 2008 11:06:34 -0000 To: commits@activemq.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080724110634.DF28A238889D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jstrachan Date: Thu Jul 24 04:06:33 2008 New Revision: 679354 URL: http://svn.apache.org/viewvc?rev=679354&view=rev Log: added fix for https://issues.apache.org/jira/browse/ZOOKEEPER-89 where we also call WhenOwnerListener.whenNotOwner() if someone calls unlock() explicitly. Modified: activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java Modified: activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java URL: http://svn.apache.org/viewvc/activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java?rev=679354&r1=679353&r2=679354&view=diff ============================================================================== --- activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java (original) +++ activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java Thu Jul 24 04:06:33 2008 @@ -53,8 +53,12 @@ public WriteLock(ZooKeeperFacade zookeeper, String dir, WhenOwnerListener whenOwnerListener) { super(zookeeper); this.dir = dir; - this.whenOwnerListener = whenOwnerListener; - zookeeper.addWhenOwnerListener(whenOwnerListener); + + // lets minimise the amount of events that the application developer gets + // so they only know the times when they really become the leader and the first time + // that they are no longer the leader + this.whenOwnerListener = new WhenOwnerDelegate(whenOwnerListener); + zookeeper.addWhenOwnerListener(this.whenOwnerListener); } /** @@ -62,6 +66,8 @@ */ public void unlock() { if (!isClosed() && id != null) { + whenOwnerListener.whenNotOwner(); + // we don't need to retry this operation in the case of failure // as ZK will remove ephemeral files and we don't wanna hang // this process when closing if we cannot reconnect to ZK @@ -91,7 +97,7 @@ } ensurePathExists(dir); - return (Boolean) retryOperation(new ZooKeeperOperation() { + boolean answer = (Boolean) retryOperation(new ZooKeeperOperation() { public Object execute() throws KeeperException, InterruptedException { do { if (id == null) { @@ -179,6 +185,11 @@ return Boolean.FALSE; } }); + + if (!answer) { + whenOwnerListener.whenNotOwner(); + } + return answer; } public String getDir() { @@ -214,7 +225,6 @@ @Override protected void doClose() { - whenOwnerListener.whenNotOwner(); zookeeper.removeWhenOwnerListener(whenOwnerListener); unlock(); } Modified: activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java URL: http://svn.apache.org/viewvc/activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java?rev=679354&r1=679353&r2=679354&view=diff ============================================================================== --- activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java (original) +++ activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java Thu Jul 24 04:06:33 2008 @@ -17,17 +17,17 @@ */ package org.apache.zookeeper.protocols; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.proto.WatcherEvent; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.WatcherEvent; import java.io.IOException; -import java.util.List; import java.util.ArrayList; +import java.util.List; /** * A facade around the {@link ZooKeeper} instance which can deal with sessions expiring so that the facade is capable @@ -39,7 +39,8 @@ private ZooKeeper zookeeper; private final String host; private final int sessionTimeout; - private volatile Watcher defaultWatcher; + private final Watcher defaultWatcher; + private final Watcher watcher; private long sessionId; private byte[] sessionPasswd; private List whenOwnerListeners = new ArrayList(); @@ -48,6 +49,7 @@ this.host = host; this.sessionTimeout = sessionTimeout; this.defaultWatcher = watcher; + this.watcher = createWatcher(watcher); zookeeper = createZooKeeper(); } @@ -57,12 +59,13 @@ this.defaultWatcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; + this.watcher = createWatcher(watcher); zookeeper = createZooKeeper(); } /** * Returns the {@link ZooKeeper} instance which can be recreated under your feet if a session expires and - * a call is made to {@link #reconnectWithNewSession()} + * a call is made to {@link #reconnectWithNewSession()} */ public synchronized ZooKeeper getZookeeper() { return zookeeper; @@ -99,13 +102,12 @@ * @throws IOException */ public synchronized void reconnectWithNewSession() throws InterruptedException, IOException { - for (WhenOwnerListener listener : whenOwnerListeners) { - listener.whenNotOwner(); - } + fireNotOwner(); close(); // now lets force the reconnection sessionId = 0; + zookeeper = null; zookeeper = createZooKeeper(); } @@ -118,11 +120,7 @@ public void close() throws InterruptedException { if (zookeeper != null) { - try { - zookeeper.close(); - } finally { - zookeeper = null; - } + zookeeper.close(); } } @@ -253,16 +251,47 @@ public String toString() { return getZookeeper().toString(); } - + // Implementation methods //------------------------------------------------------------------------- protected ZooKeeper createZooKeeper() throws IOException { if (sessionId != 0 || sessionPasswd != null) { - return new ZooKeeper(host, sessionTimeout, defaultWatcher, sessionId, sessionPasswd); + return new ZooKeeper(host, sessionTimeout, watcher, sessionId, sessionPasswd); + } else { + return new ZooKeeper(host, sessionTimeout, watcher); } - else { - return new ZooKeeper(host, sessionTimeout, defaultWatcher); + } + + /** + * Creates the default watcher to use + * + * @param defaultWatcher the default watcher which is invoked if it is not null + * @return a newly created watcher + */ + protected Watcher createWatcher(final Watcher defaultWatcher) { + return new Watcher() { + public void process(WatcherEvent event) { + // fire not owner notifications if the connection disconnects + if (event.getState() != Watcher.Event.KeeperStateSyncConnected) { + fireNotOwner(); + } + + int state = event.getState(); + if (defaultWatcher != null) { + defaultWatcher.process(event); + } + } + }; + } + + + /** + * Fires the not owner event to any {@link WhenOwnerListener instances} + */ + protected void fireNotOwner() { + for (WhenOwnerListener listener : whenOwnerListeners) { + listener.whenNotOwner(); } } } Modified: activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java?rev=679354&r1=679353&r2=679354&view=diff ============================================================================== --- activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java (original) +++ activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java Thu Jul 24 04:06:33 2008 @@ -55,11 +55,12 @@ }); WriteLock leader = new WriteLock(keeper, dir, new WhenOwnerListener() { public void whenOwner() { + LOG.info(">>>>>>>>>> OWNER ZNode " + nodeId); latch.countDown(); } public void whenNotOwner() { - LOG.info("ZNode " + nodeId + " is no longer the owner!"); + LOG.info("<<<<<<<<<< NOT OWNER ZNode " + nodeId); } }); nodes[i] = leader; @@ -84,24 +85,24 @@ if (count > 1) { if (killLeader) { - System.out.println("Now killing the leader"); - // now lets kill the leader - latch = new CountDownLatch(1); - first.unlock(); - latch.await(30, TimeUnit.SECONDS); + System.out.println("Now killing the leader"); + // now lets kill the leader + latch = new CountDownLatch(1); + first.unlock(); + latch.await(30, TimeUnit.SECONDS); - //Thread.sleep(10000); - WriteLock second = nodes[1]; + //Thread.sleep(10000); + WriteLock second = nodes[1]; - dumpNodes(count); + dumpNodes(count); - // lets assert that the first election is the leader - assertTrue("The second znode should be the leader " + second.getId(), second.isOwner()); + // lets assert that the first election is the leader + assertTrue("The second znode should be the leader " + second.getId(), second.isOwner()); - for (int i = 2; i < count; i++) { - WriteLock node = nodes[i]; - assertFalse("Node should not be the leader " + node.getId(), node.isOwner()); - } + for (int i = 2; i < count; i++) { + WriteLock node = nodes[i]; + assertFalse("Node should not be the leader " + node.getId(), node.isOwner()); + } }