activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r679354 - in /activemq/sandbox/zookeeper/zookeeper-protocols/src: main/java/org/apache/zookeeper/protocols/WriteLock.java main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java test/java/org/apache/zookeeper/protocols/WriteLockTest.java
Date Thu, 24 Jul 2008 11:06:34 GMT
Author: jstrachan
Date: Thu Jul 24 04:06:33 2008
New Revision: 679354

URL: http://svn.apache.org/viewvc?rev=679354&view=rev
Log:
added fix for https://issues.apache.org/jira/browse/ZOOKEEPER-89 where we also call WhenOwnerListener.whenNotOwner()
if someone calls unlock() explicitly.

Modified:
    activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java
    activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java
    activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java

Modified: activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java?rev=679354&r1=679353&r2=679354&view=diff
==============================================================================
--- activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java
(original)
+++ activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/WriteLock.java
Thu Jul 24 04:06:33 2008
@@ -53,8 +53,12 @@
     public WriteLock(ZooKeeperFacade zookeeper, String dir, WhenOwnerListener whenOwnerListener)
{
         super(zookeeper);
         this.dir = dir;
-        this.whenOwnerListener = whenOwnerListener;
-        zookeeper.addWhenOwnerListener(whenOwnerListener);
+
+        // lets minimise the amount of events that the application developer gets
+        // so they only know the times when they really become the leader and the first time
+        // that they are no longer the leader
+        this.whenOwnerListener = new WhenOwnerDelegate(whenOwnerListener);
+        zookeeper.addWhenOwnerListener(this.whenOwnerListener);
     }
 
     /**
@@ -62,6 +66,8 @@
      */
     public void unlock() {
         if (!isClosed() && id != null) {
+            whenOwnerListener.whenNotOwner();
+
             // we don't need to retry this operation in the case of failure
             // as ZK will remove ephemeral files and we don't wanna hang
             // this process when closing if we cannot reconnect to ZK
@@ -91,7 +97,7 @@
         }
         ensurePathExists(dir);
 
-        return (Boolean) retryOperation(new ZooKeeperOperation() {
+        boolean answer = (Boolean) retryOperation(new ZooKeeperOperation() {
             public Object execute() throws KeeperException, InterruptedException {
                 do {
                     if (id == null) {
@@ -179,6 +185,11 @@
                 return Boolean.FALSE;
             }
         });
+
+        if (!answer) {
+            whenOwnerListener.whenNotOwner();
+        }
+        return answer;
     }
 
     public String getDir() {
@@ -214,7 +225,6 @@
 
     @Override
     protected void doClose() {
-        whenOwnerListener.whenNotOwner();
         zookeeper.removeWhenOwnerListener(whenOwnerListener);
         unlock();
     }

Modified: activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java?rev=679354&r1=679353&r2=679354&view=diff
==============================================================================
--- activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java
(original)
+++ activemq/sandbox/zookeeper/zookeeper-protocols/src/main/java/org/apache/zookeeper/protocols/ZooKeeperFacade.java
Thu Jul 24 04:06:33 2008
@@ -17,17 +17,17 @@
  */
 package org.apache.zookeeper.protocols;
 
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.proto.WatcherEvent;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.WatcherEvent;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A facade around the {@link ZooKeeper} instance which can deal with sessions expiring so
that the facade is capable
@@ -39,7 +39,8 @@
     private ZooKeeper zookeeper;
     private final String host;
     private final int sessionTimeout;
-    private volatile Watcher defaultWatcher;
+    private final Watcher defaultWatcher;
+    private final Watcher watcher;
     private long sessionId;
     private byte[] sessionPasswd;
     private List<WhenOwnerListener> whenOwnerListeners = new ArrayList<WhenOwnerListener>();
@@ -48,6 +49,7 @@
         this.host = host;
         this.sessionTimeout = sessionTimeout;
         this.defaultWatcher = watcher;
+        this.watcher = createWatcher(watcher);
         zookeeper = createZooKeeper();
     }
 
@@ -57,12 +59,13 @@
         this.defaultWatcher = watcher;
         this.sessionId = sessionId;
         this.sessionPasswd = sessionPasswd;
+        this.watcher = createWatcher(watcher);
         zookeeper = createZooKeeper();
     }
 
     /**
      * Returns the {@link ZooKeeper} instance which can be recreated under your feet if a
session expires and
-     * a call is made to {@link #reconnectWithNewSession()} 
+     * a call is made to {@link #reconnectWithNewSession()}
      */
     public synchronized ZooKeeper getZookeeper() {
         return zookeeper;
@@ -99,13 +102,12 @@
      * @throws IOException
      */
     public synchronized void reconnectWithNewSession() throws InterruptedException, IOException
{
-        for (WhenOwnerListener listener : whenOwnerListeners) {
-            listener.whenNotOwner();
-        }
+        fireNotOwner();
         close();
 
         // now lets force the reconnection
         sessionId = 0;
+        zookeeper = null;
         zookeeper = createZooKeeper();
     }
 
@@ -118,11 +120,7 @@
 
     public void close() throws InterruptedException {
         if (zookeeper != null) {
-            try {
-                zookeeper.close();
-            } finally {
-                zookeeper = null;
-            }
+            zookeeper.close();
         }
     }
 
@@ -253,16 +251,47 @@
     public String toString() {
         return getZookeeper().toString();
     }
-    
+
     // Implementation methods
     //-------------------------------------------------------------------------
 
     protected ZooKeeper createZooKeeper() throws IOException {
         if (sessionId != 0 || sessionPasswd != null) {
-            return new ZooKeeper(host, sessionTimeout, defaultWatcher, sessionId, sessionPasswd);
+            return new ZooKeeper(host, sessionTimeout, watcher, sessionId, sessionPasswd);
+        } else {
+            return new ZooKeeper(host, sessionTimeout, watcher);
         }
-        else {
-            return new ZooKeeper(host, sessionTimeout, defaultWatcher);
+    }
+
+    /**
+     * Creates the default watcher to use
+     *
+     * @param defaultWatcher the default watcher which is invoked if it is not null
+     * @return a newly created watcher
+     */
+    protected Watcher createWatcher(final Watcher defaultWatcher) {
+        return new Watcher() {
+            public void process(WatcherEvent event) {
+                // fire not owner notifications if the connection disconnects
+                if (event.getState() != Watcher.Event.KeeperStateSyncConnected) {
+                    fireNotOwner();
+                }
+
+                int state = event.getState();
+                if (defaultWatcher != null) {
+                    defaultWatcher.process(event);
+                }
+            }
+        };
+    }
+
+
+    /**
+     * Fires the not owner event to any {@link WhenOwnerListener instances}
+     */
+    protected void fireNotOwner() {
+        for (WhenOwnerListener listener : whenOwnerListeners) {
+            listener.whenNotOwner();
         }
     }
 }

Modified: activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java?rev=679354&r1=679353&r2=679354&view=diff
==============================================================================
--- activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java
(original)
+++ activemq/sandbox/zookeeper/zookeeper-protocols/src/test/java/org/apache/zookeeper/protocols/WriteLockTest.java
Thu Jul 24 04:06:33 2008
@@ -55,11 +55,12 @@
             });
             WriteLock leader = new WriteLock(keeper, dir, new WhenOwnerListener() {
                 public void whenOwner() {
+                    LOG.info(">>>>>>>>>> OWNER ZNode " + nodeId);
                     latch.countDown();
                 }
 
                 public void whenNotOwner() {
-                    LOG.info("ZNode " + nodeId + " is no longer the owner!");
+                    LOG.info("<<<<<<<<<< NOT OWNER ZNode "
+ nodeId);
                 }
             });
             nodes[i] = leader;
@@ -84,24 +85,24 @@
 
         if (count > 1) {
             if (killLeader) {
-            System.out.println("Now killing the leader");
-            // now lets kill the leader
-            latch = new CountDownLatch(1);
-            first.unlock();
-            latch.await(30, TimeUnit.SECONDS);
+                System.out.println("Now killing the leader");
+                // now lets kill the leader
+                latch = new CountDownLatch(1);
+                first.unlock();
+                latch.await(30, TimeUnit.SECONDS);
 
-            //Thread.sleep(10000);
-            WriteLock second = nodes[1];
+                //Thread.sleep(10000);
+                WriteLock second = nodes[1];
 
-            dumpNodes(count);
+                dumpNodes(count);
 
-            // lets assert that the first election is the leader
-            assertTrue("The second znode should be the leader " + second.getId(), second.isOwner());
+                // lets assert that the first election is the leader
+                assertTrue("The second znode should be the leader " + second.getId(), second.isOwner());
 
-            for (int i = 2; i < count; i++) {
-                WriteLock node = nodes[i];
-                assertFalse("Node should not be the leader " + node.getId(), node.isOwner());
-            }
+                for (int i = 2; i < count; i++) {
+                    WriteLock node = nodes[i];
+                    assertFalse("Node should not be the leader " + node.getId(), node.isOwner());
+                }
             }
 
 



Mime
View raw message