hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r706815 - in /hadoop/zookeeper/trunk: ./ src/ src/java/jmx/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/test/
Date Tue, 21 Oct 2008 23:49:45 GMT
Author: mahadev
Date: Tue Oct 21 16:49:45 2008
New Revision: 706815

URL: http://svn.apache.org/viewvc?rev=706815&view=rev
Log:
ZOOKEEPER-43. Server side of auto reset watches.

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java
    hadoop/zookeeper/trunk/src/zookeeper.jute

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Oct 21 16:49:45 2008
@@ -30,6 +30,8 @@
 
   ZOOKEEPER-33. Better ACL management
   (mahadev)
+   
+  ZOOKEEPER-43. Server side of auto reset watches.
 
 Backward compatibile changes:
 

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java Tue Oct 21 16:49:45 2008
@@ -87,9 +87,11 @@
         return result;
     }
 
-    public void deleteNode(String path) throws KeeperException.NoNodeException {
+    public void deleteNode(String path, long zxid)
+        throws KeeperException.NoNodeException
+    {
         DataNode deleted=getNode(path);
-        super.deleteNode(path);
+        super.deleteNode(path, zxid);
         ObserverManager.getInstance().notifyObservers(this,
                 new TreeEventInfo(Event.DELETE,deleted));
     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Tue Oct 21 16:49:45 2008
@@ -60,6 +60,7 @@
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetACLResponse;
 import org.apache.zookeeper.proto.SetDataResponse;
+import org.apache.zookeeper.proto.SetWatches;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.ZooTrace;
@@ -72,6 +73,19 @@
  */
 public class ClientCnxn {
     private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
+    
+    /** This controls whether automatic watch resetting is enabled.
+     * Clients automatically reset watches during session reconnect, this
+     * option allows the client to turn off this behavior by setting
+     * the environment variable "zookeeper.disableAutoWatchReset" to "true" */
+    public static boolean disableAutoWatchReset;
+    static {
+        // this var should not be public, but otw there is no easy way 
+        // to test
+        disableAutoWatchReset = 
+            Boolean.getBoolean("zookeeper.disableAutoWatchReset");
+        LOG.info("zookeeper.disableAutoWatchReset is " + disableAutoWatchReset);
+    }
 
     private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>();
 
@@ -446,7 +460,7 @@
         finishPacket(p);
     }
 
-    long lastZxid;
+    volatile long lastZxid;
 
     /**
      * This class services the outgoing request queue and generates the heart
@@ -479,7 +493,8 @@
             if (sessionTimeout <= 0) {
                 zooKeeper.state = States.CLOSED;
 
-                eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
+                eventThread.queueEvent(new WatchedEvent(
+                        Watcher.Event.EventType.None,
                         Watcher.Event.KeeperState.Expired, null));
                 throw new IOException("Session Expired");
             }
@@ -489,6 +504,15 @@
             sessionPasswd = conRsp.getPasswd();
             eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                     Watcher.Event.KeeperState.SyncConnected, null));
+            if (!disableAutoWatchReset) {
+                SetWatches sw = new SetWatches(lastZxid,
+                        zooKeeper.getDataWatches(),
+                        zooKeeper.getExistWatches(),
+                        zooKeeper.getChildWatches());
+                RequestHeader h = new RequestHeader();
+                h.setType(ZooDefs.OpCode.setWatches);
+                queuePacket(h, new ReplyHeader(), sw, null, null, null, null, null);
+            }
         }
 
         void readResponse() throws IOException {
@@ -540,27 +564,31 @@
              * Since requests are processed in order, we better get a response
              * to the first request!
              */
-         if (packet.header.getXid() != replyHdr.getXid()) {
-         throw new IOException("Xid out of order. Got "
-                  + replyHdr.getXid() + " expected "
-                  + packet.header.getXid());
-            }
-
-         packet.replyHeader.setXid(replyHdr.getXid());
-         packet.replyHeader.setErr(replyHdr.getErr());
-         packet.replyHeader.setZxid(replyHdr.getZxid());
-         lastZxid = replyHdr.getZxid();
-         if (packet.response != null && replyHdr.getErr() == 0) {
-             packet.response.deserialize(bbia, "response");
-            }
-            packet.finished = true;
+            try {
+                if (packet.header.getXid() != replyHdr.getXid()) {
+                    packet.replyHeader.setErr(KeeperException.Code.ConnectionLoss);
+                    throw new IOException("Xid out of order. Got "
+                            + replyHdr.getXid() + " expected "
+                            + packet.header.getXid());
+                }
 
-            if (LOG.isDebugEnabled()) {
-            LOG.debug("Reading reply sessionid:0x"
-                + Long.toHexString(sessionId) + ", packet:: " + packet);
-        }
+                packet.replyHeader.setXid(replyHdr.getXid());
+                packet.replyHeader.setErr(replyHdr.getErr());
+                packet.replyHeader.setZxid(replyHdr.getZxid());
+                if (replyHdr.getZxid() > 0) {
+                    lastZxid = replyHdr.getZxid();
+                }
+                if (packet.response != null && replyHdr.getErr() == 0) {
+                    packet.response.deserialize(bbia, "response");
+                }
 
-            finishPacket(packet);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Reading reply sessionid:0x"
+                            + Long.toHexString(sessionId) + ", packet:: " + packet);
+                }
+            } finally {
+                finishPacket(packet);
+            }
         }
 
         /**

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java Tue Oct 21 16:49:45 2008
@@ -49,6 +49,8 @@
         public final int ping = 11;
 
         public final int auth = 100;
+        
+        public final int setWatches = 101;
 
         public final int createSession = -10;
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Tue Oct 21 16:49:45 2008
@@ -19,6 +19,7 @@
 package org.apache.zookeeper;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -99,7 +100,20 @@
 
     private final ZKWatchManager watchManager = new ZKWatchManager();
 
-    /**
+    List<String> getDataWatches() {
+        List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
+        return rc;
+    }
+    List<String> getExistWatches() {
+        List<String> rc =  new ArrayList<String>(watchManager.existWatches.keySet());
+        return rc;
+    }
+    List<String> getChildWatches() {
+        List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
+        return rc;
+    }
+
+/**
      * Manage watchers & handle events generated by the ClientCnxn object.
      *
      * We are implementing this as a nested class of ZooKeeper so that
@@ -109,11 +123,19 @@
     private class ZKWatchManager implements ClientWatchManager {
         private final Map<String, Set<Watcher>> dataWatches =
             new HashMap<String, Set<Watcher>>();
+        private final Map<String, Set<Watcher>> existWatches =
+            new HashMap<String, Set<Watcher>>();
         private final Map<String, Set<Watcher>> childWatches =
             new HashMap<String, Set<Watcher>>();
-
+        
         private volatile Watcher defaultWatcher;
 
+        final private void addTo(Set<Watcher> from, Set<Watcher> to) {
+            if (from != null) {
+                to.addAll(from);
+            }
+        }
+        
         /* (non-Javadoc)
          * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
          */
@@ -121,58 +143,60 @@
                                         Watcher.Event.EventType type, String path) {
             Set<Watcher> result = new HashSet<Watcher>();
 
-            // clear the watches if we are not connected
+            switch (type) {
+            case None:
+                result.add(defaultWatcher);
+                for(Set<Watcher> ws: dataWatches.values()) {
+                    result.addAll(ws);
+                }
+                for(Set<Watcher> ws: existWatches.values()) {
+                    result.addAll(ws);
+                }
+                for(Set<Watcher> ws: childWatches.values()) {
+                    result.addAll(ws);
+                }
 
-            if (state != Watcher.Event.KeeperState.SyncConnected) {
-                synchronized (dataWatches) {
-                    for (Set<Watcher> watchers : dataWatches.values()) {
-                        for (Watcher watcher : watchers) {
-                            result.add(watcher);
-                        }
+                // clear the watches if auto watch reset is not enabled
+                if (ClientCnxn.disableAutoWatchReset &&
+                        state != Watcher.Event.KeeperState.SyncConnected)
+                {
+                    synchronized(dataWatches) {
+                        dataWatches.clear();
                     }
-                    dataWatches.clear();
-                }
-                synchronized (childWatches) {
-                    for (Set<Watcher> watchers : childWatches.values()) {
-                        for (Watcher watcher : watchers) {
-                            result.add(watcher);
-                        }
+                    synchronized(existWatches) {
+                        existWatches.clear();
+                    }
+                    synchronized(childWatches) {
+                        childWatches.clear();
                     }
-                    childWatches.clear();
                 }
-            }
-
-            Set<Watcher> watchers = null;
 
-            switch (type) {
-            case None:
-                result.add(defaultWatcher);
                 return result;
             case NodeDataChanged:
             case NodeCreated:
                 synchronized (dataWatches) {
-                    watchers = dataWatches.remove(path);
+                    addTo(dataWatches.remove(path), result);
+                }
+                synchronized (existWatches) {
+                    addTo(existWatches.remove(path), result);
                 }
                 break;
             case NodeChildrenChanged:
                 synchronized (childWatches) {
-                    watchers = childWatches.remove(path);
+                    addTo(childWatches.remove(path), result);
                 }
                 break;
             case NodeDeleted:
                 synchronized (dataWatches) {
-                    watchers = dataWatches.remove(path);
+                    addTo(dataWatches.remove(path), result);
                 }
-                Set<Watcher> cwatches;
-                synchronized (childWatches) {
-                    cwatches = childWatches.remove(path);
+                // XXX This shouldn't be needed, but just in case
+                synchronized (existWatches) {
+                    addTo(existWatches.remove(path), result);
+                    LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                 }
-                if (cwatches != null) {
-                    if (watchers == null) {
-                        watchers = cwatches;
-                    } else {
-                        watchers.addAll(cwatches);
-                    }
+                synchronized (childWatches) {
+                    addTo(childWatches.remove(path), result);
                 }
                 break;
             default:
@@ -182,26 +206,24 @@
                 throw new RuntimeException(msg);
             }
 
-            result.addAll(watchers);
             return result;
         }
     }
-
+    
     /**
      * Register a watcher for a particular path.
      */
-    class WatchRegistration {
-        private Map<String, Set<Watcher>> watches;
+    abstract class WatchRegistration {
         private Watcher watcher;
         private String path;
-        public WatchRegistration(Map<String, Set<Watcher>> watches,
-                Watcher watcher, String path)
+        public WatchRegistration(Watcher watcher, String path)
         {
-            this.watches = watches;
             this.watcher = watcher;
             this.path = path;
         }
 
+        abstract protected Map<String, Set<Watcher>> getWatches(int rc);
+        
         /**
          * Register the watcher with the set of watches on path.
          * @param rc the result code of the operation that attempted to
@@ -209,6 +231,7 @@
          */
         public void register(int rc) {
             if (shouldAddWatch(rc)) {
+                Map<String, Set<Watcher>> watches = getWatches(rc);
                 synchronized(watches) {
                     Set<Watcher> watchers = watches.get(path);
                     if (watchers == null) {
@@ -234,17 +257,43 @@
      * even in the case where NONODE result code is returned.
      */
     class ExistsWatchRegistration extends WatchRegistration {
-        public ExistsWatchRegistration(Map<String, Set<Watcher>> watches,
-                Watcher watcher, String path)
-        {
-            super(watches, watcher, path);
+        public ExistsWatchRegistration(Watcher watcher, String path) {
+            super(watcher, path);
         }
+        
+        @Override
+        protected Map<String, Set<Watcher>> getWatches(int rc) {
+            return rc == 0 ?  watchManager.dataWatches : watchManager.existWatches;
+        }
+        
         @Override
         protected boolean shouldAddWatch(int rc) {
             return rc == 0 || rc == KeeperException.Code.NoNode;
         }
     }
 
+    class DataWatchRegistration extends WatchRegistration {
+        public DataWatchRegistration(Watcher watcher, String path) {
+            super(watcher, path);
+        }
+
+        @Override
+        protected Map<String, Set<Watcher>> getWatches(int rc) {
+            return watchManager.dataWatches;
+        }
+    }
+    
+    class ChildWatchRegistration extends WatchRegistration {
+        public ChildWatchRegistration(Watcher watcher, String path) {
+            super(watcher, path);
+        }
+
+        @Override
+        protected Map<String, Set<Watcher>> getWatches(int rc) {
+            return watchManager.childWatches;
+        }
+    }
+
     public enum States {
         CONNECTING, ASSOCIATING, CONNECTED, CLOSED, AUTH_FAILED;
 
@@ -534,8 +583,7 @@
         SetDataResponse response = new SetDataResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
-                    path);
+            wcb = new ExistsWatchRegistration(watcher, path);
         }
         ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
@@ -589,8 +637,7 @@
         SetDataResponse response = new SetDataResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
-                    path);
+            wcb = new ExistsWatchRegistration(watcher, path);
         }
         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
                         ctx, wcb);
@@ -634,8 +681,7 @@
         GetDataResponse response = new GetDataResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new WatchRegistration(watchManager.dataWatches, watcher,
-                    path);
+            wcb = new DataWatchRegistration(watcher, path);
         }
         ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
@@ -685,8 +731,7 @@
         GetDataResponse response = new GetDataResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new WatchRegistration(watchManager.dataWatches, watcher,
-                    path);
+            wcb = new DataWatchRegistration(watcher, path);
         }
         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
                         ctx, wcb);
@@ -899,8 +944,7 @@
         GetChildrenResponse response = new GetChildrenResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new WatchRegistration(watchManager.childWatches, watcher,
-                    path);
+            wcb = new ChildWatchRegistration(watcher, path);
         }
         ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
@@ -950,8 +994,7 @@
         GetChildrenResponse response = new GetChildrenResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new WatchRegistration(watchManager.childWatches, watcher,
-                    path);
+            wcb = new ChildWatchRegistration(watcher, path);
         }
         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
                         ctx, wcb);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Tue Oct 21 16:49:45 2008
@@ -83,13 +83,19 @@
     }
 
     private static void printStat(Stat stat) {
-        System.err.println("ctime = " + new Date(stat.getCtime()).toString());
-        System.err.println("ctime = " + new Date(stat.getMtime()).toString());
-        System.err.println("cversion = " + stat.getCversion());
         System.err.println("cZxid = " + stat.getCzxid());
+        System.err.println("ctime = " + new Date(stat.getCtime()).toString());
         System.err.println("mZxid = " + stat.getMzxid());
+        System.err.println("mtime = " + new Date(stat.getMtime()).toString());
+        System.err.println("pZxid = " + stat.getPzxid());
+
+        System.err.println("cversion = " + stat.getCversion());
         System.err.println("dataVersion = " + stat.getVersion());
         System.err.println("aclVersion = " + stat.getAversion());
+        
+        System.err.println("ephemeralOwner = " + stat.getEphemeralOwner());
+        System.err.println("dataLength = " + stat.getDataLength());
+        System.err.println("numChildren = " + stat.getNumChildren());
     }
 
     public static void main(String args[]) throws NumberFormatException,

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java Tue Oct 21 16:49:45 2008
@@ -93,6 +93,7 @@
         to.setCzxid(stat.getCzxid());
         to.setMtime(stat.getMtime());
         to.setMzxid(stat.getMzxid());
+        to.setPzxid(stat.getPzxid());
         to.setVersion(stat.getVersion());
         to.setEphemeralOwner(stat.getEphemeralOwner());
         to.setDataLength(data.length);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Tue Oct 21 16:49:45 2008
@@ -34,14 +34,17 @@
 import org.apache.jute.Record;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.txn.CreateTxn;
 import org.apache.zookeeper.txn.DeleteTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
@@ -229,6 +232,7 @@
         to.setCzxid(from.getCzxid());
         to.setMtime(from.getMtime());
         to.setMzxid(from.getMzxid());
+        to.setPzxid(from.getPzxid());
         to.setVersion(from.getVersion());
         to.setEphemeralOwner(from.getEphemeralOwner());
     }
@@ -240,6 +244,7 @@
         to.setCzxid(from.getCzxid());
         to.setMtime(from.getMtime());
         to.setMzxid(from.getMzxid());
+        to.setPzxid(from.getPzxid());
         to.setVersion(from.getVersion());
         to.setEphemeralOwner(from.getEphemeralOwner());
         to.setDataLength(from.getDataLength());
@@ -285,6 +290,7 @@
         stat.setMtime(time);
         stat.setCzxid(zxid);
         stat.setMzxid(zxid);
+        stat.setPzxid(zxid);
         stat.setVersion(0);
         stat.setAversion(0);
         stat.setEphemeralOwner(ephemeralOwner);
@@ -299,6 +305,7 @@
             int cver = parent.stat.getCversion();
             cver++;
             parent.stat.setCversion(cver);
+            parent.stat.setPzxid(zxid);
             Long longval = convertAcls(acl);
             DataNode child = new DataNode(parent, data, longval, stat);
             parent.children.add(childName);
@@ -321,10 +328,13 @@
 
     /**
      * remove the path from the datatree
-     * @param path the path to be deleted
+     * @param path the path to of the node to be deleted
+     * @param zxid the current zxid
      * @throws KeeperException.NoNodeException
      */
-    public void deleteNode(String path) throws KeeperException.NoNodeException {
+    public void deleteNode(String path, long zxid)
+        throws KeeperException.NoNodeException
+    {
         int lastSlash = path.lastIndexOf('/');
         String parentName = path.substring(0, lastSlash);
         String childName = path.substring(lastSlash + 1);
@@ -340,6 +350,7 @@
         synchronized (parent) {
             parent.children.remove(childName);
             parent.stat.setCversion(parent.stat.getCversion() + 1);
+            parent.stat.setPzxid(zxid);
             long eowner = node.stat.getEphemeralOwner();
             if (eowner != 0) {
                 HashSet<String> nodes = ephemerals.get(eowner);
@@ -522,7 +533,7 @@
             case OpCode.delete:
                 DeleteTxn deleteTxn = (DeleteTxn) txn;
                 debug = "Delete transaction for " + deleteTxn.getPath();
-                deleteNode(deleteTxn.getPath());
+                deleteNode(deleteTxn.getPath(), header.getZxid());
                 break;
             case OpCode.setData:
                 SetDataTxn setDataTxn = (SetDataTxn) txn;
@@ -537,7 +548,7 @@
                         setACLTxn.getVersion());
                 break;
             case OpCode.closeSession:
-                killSession(header.getClientId());
+                killSession(header.getClientId(), header.getZxid());
                 break;
             case OpCode.error:
                 ErrorTxn errTxn = (ErrorTxn) txn;
@@ -555,7 +566,7 @@
         return rc;
     }
 
-    void killSession(long session) {
+    void killSession(long session, long zxid) {
         // the list is already removed from the ephemerals
         // so we do not have to worry about synchronyzing on
         // the list. This is only called from FinalRequestProcessor
@@ -566,7 +577,7 @@
         if (list != null) {
             for (String path : list) {
                 try {
-                    deleteNode(path);
+                    deleteNode(path, zxid);
                     ZooTrace.logTraceMessage(LOG,
                                              ZooTrace.SESSION_TRACE_MASK,
                                              "Deleting ephemeral node "
@@ -727,4 +738,61 @@
         // dataWatches = null;
         // childWatches = null;
     }
+
+    public void setWatches(long relativeZxid, List<String> dataWatches, 
+            List<String> existWatches, List<String> childWatches, Watcher watcher) {
+        for(String path: dataWatches) {
+            DataNode node = getNode(path);
+            WatchedEvent e = null;
+            if (node == null) {
+                e = new WatchedEvent(EventType.NodeDeleted,
+                        KeeperState.SyncConnected, path);
+            } else if (node.stat.getCzxid() > relativeZxid) {
+                e = new WatchedEvent(EventType.NodeCreated,
+                        KeeperState.SyncConnected, path);
+            } else if (node.stat.getMzxid() > relativeZxid) {
+                e = new WatchedEvent(EventType.NodeDataChanged,
+                        KeeperState.SyncConnected, path);
+            }
+            if (e != null) {
+                watcher.process(e);
+            } else {
+                this.dataWatches.addWatch(path, watcher);
+            }
+        }
+        for(String path: existWatches) {
+            DataNode node = getNode(path);
+            WatchedEvent e = null;
+            if (node == null) {
+                // This is the case when the watch was registered
+            } else if (node.stat.getMzxid() > relativeZxid) {
+                e = new WatchedEvent(EventType.NodeDataChanged,
+                        KeeperState.SyncConnected, path);
+            } else {
+                e = new WatchedEvent(EventType.NodeCreated,
+                        KeeperState.SyncConnected, path);
+            }
+            if (e != null) {
+                watcher.process(e);
+            } else {
+                this.dataWatches.addWatch(path, watcher);
+            }
+        }
+        for(String path: childWatches) {
+            DataNode node = getNode(path);
+            WatchedEvent e = null;
+            if (node == null) {
+                e = new WatchedEvent(EventType.NodeDeleted,
+                        KeeperState.SyncConnected, path);
+            } else if (node.stat.getPzxid() > relativeZxid) {
+                e = new WatchedEvent(EventType.NodeChildrenChanged,
+                        KeeperState.SyncConnected, path);
+            }
+            if (e != null) {
+                watcher.process(e);
+            } else {
+                this.childWatches.addWatch(path, watcher);
+            }
+        }
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Tue Oct 21 16:49:45 2008
@@ -43,6 +43,7 @@
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.SetACLResponse;
 import org.apache.zookeeper.proto.SetDataResponse;
+import org.apache.zookeeper.proto.SetWatches;
 import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
@@ -195,6 +196,17 @@
                         getDataRequest.getWatch() ? request.cnxn : null);
                 rsp = new GetDataResponse(b, stat);
                 break;
+            case OpCode.setWatches:
+                SetWatches setWatches = new SetWatches();
+                // XXX We really should NOT need this!!!!
+                request.request.rewind();
+                ZooKeeperServer.byteBuffer2Record(request.request, setWatches);
+                long relativeZxid = setWatches.getRelativeZxid();
+                zks.dataTree.setWatches(relativeZxid, 
+                        setWatches.getDataWatches(), 
+                        setWatches.getExistWatches(),
+                        setWatches.getChildWatches(), request.cnxn);
+                break;
             case OpCode.getACL:
                 GetACLRequest getACLRequest = new GetACLRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Tue Oct 21 16:49:45 2008
@@ -352,6 +352,7 @@
             case OpCode.getACL:
             case OpCode.getChildren:
             case OpCode.ping:
+            case OpCode.setWatches:
                 break;
             }
         } catch (KeeperException e) {
@@ -379,9 +380,7 @@
         }
         request.hdr = txnHeader;
         request.txn = txn;
-        if (request.hdr != null) {
-            request.zxid = request.hdr.getZxid();
-        }
+        request.zxid = zks.getZxid();
         nextProcessor.processRequest(request);
     }
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Tue Oct 21 16:49:45 2008
@@ -99,6 +99,7 @@
         case OpCode.getChildren:
         case OpCode.ping:
         case OpCode.closeSession:
+        case OpCode.setWatches:
             return true;
         default:
             return false;
@@ -131,6 +132,8 @@
             return "notification";
         case OpCode.create:
             return "create";
+        case OpCode.setWatches:
+            return "setWatches";
         case OpCode.delete:
             return "delete";
         case OpCode.exists:

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Tue Oct 21 16:49:45 2008
@@ -191,7 +191,8 @@
         }
         dataTree.initialized = true;
         for (long session : deadSessions) {
-            killSession(session);
+            // XXX: Is lastProcessedZxid really the best thing to use?
+            killSession(session, dataTree.lastProcessedZxid);
         }
         // Make a clean snapshot
         takeSnapshot();
@@ -284,8 +285,8 @@
         submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
     }
 
-    protected void killSession(long sessionId) {
-        dataTree.killSession(sessionId);
+    protected void killSession(long sessionId, long zxid) {
+        dataTree.killSession(sessionId, zxid);
         ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                      "ZooKeeperServer --- killSession: 0x"
                 + Long.toHexString(sessionId));

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Oct 21 16:49:45 2008
@@ -27,6 +27,7 @@
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import junit.framework.TestCase;
 
@@ -70,11 +71,44 @@
     }
 
     protected static class CountdownWatcher implements Watcher {
+        // XXX this doesn't need to be volatile! (Should probably be final)
         volatile CountDownLatch clientConnected = new CountDownLatch(1);
-
-        public void process(WatchedEvent event) {
+        volatile boolean connected;
+        synchronized public void process(WatchedEvent event) {
             if (event.getState() == KeeperState.SyncConnected) {
+                connected = true;
+                notifyAll();
                 clientConnected.countDown();
+            } else {
+                connected = false;
+                notifyAll();
+            }
+        }
+        synchronized boolean isConnected() {
+            return connected;
+        }
+        synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
+            long expire = System.currentTimeMillis() + timeout;
+            long left = timeout;
+            while(!connected && left > 0) {
+                wait(left);
+                left = expire - System.currentTimeMillis();
+            }
+            if (!connected) {
+                throw new TimeoutException("Did not connect");
+         
+            }
+        }
+        synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
+            long expire = System.currentTimeMillis() + timeout;
+            long left = timeout;
+            while(connected && left > 0) {
+                wait(left);
+                left = expire - System.currentTimeMillis();
+            }
+            if (connected) {
+                throw new TimeoutException("Did not disconnect");
+         
             }
         }
     }
@@ -95,7 +129,7 @@
     protected ZooKeeper createClient(CountdownWatcher watcher, String hp)
         throws IOException, InterruptedException
     {
-        ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
+        ZooKeeper zk = new ZooKeeper(hp, 9000, watcher);
         if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
                 TimeUnit.MILLISECONDS))
         {
@@ -264,6 +298,17 @@
         LOG.info("Client test setup finished");
     }
 
+    protected void stopServer() throws Exception {
+        LOG.info("STOPPING server");
+        shutdownServerInstance(serverFactory, hostPort);
+        serverFactory = null;
+    }
+    
+    protected void startServer() throws Exception {
+        LOG.info("STARTING server");
+        serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort);
+    }
+    
     @Override
     protected void tearDown() throws Exception {
         LOG.info("tearDown starting");

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Tue Oct 21 16:49:45 2008
@@ -21,7 +21,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.log4j.Logger;

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java Tue Oct 21 16:49:45 2008
@@ -25,15 +25,15 @@
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
-import java.io.IOException;
+
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ServerStats;
 import org.apache.zookeeper.server.SyncRequestProcessor;

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java Tue Oct 21 16:49:45 2008
@@ -63,6 +63,7 @@
         stat.setMtime(100);
         stat.setMzxid(100);
         stat.setNumChildren(100);
+        stat.setPzxid(100);
         stat.setVersion(100);
         
         return stat;
@@ -82,6 +83,7 @@
         zk.getData(name, false, stat);
         
         assertEquals(stat.getCzxid(), stat.getMzxid());
+        assertEquals(stat.getCzxid(), stat.getPzxid());
         assertEquals(stat.getCtime(), stat.getMtime());
         assertEquals(0, stat.getCversion());
         assertEquals(0, stat.getVersion());
@@ -109,6 +111,7 @@
         zk.getData(name, false, stat);
 
         assertEquals(stat.getCzxid(), stat.getMzxid());
+        assertEquals(stat.getCzxid() + 1, stat.getPzxid());
         assertEquals(stat.getCtime(), stat.getMtime());
         assertEquals(1, stat.getCversion());
         assertEquals(0, stat.getVersion());
@@ -121,6 +124,7 @@
         zk.getData(childname, false, stat);
 
         assertEquals(stat.getCzxid(), stat.getMzxid());
+        assertEquals(stat.getCzxid(), stat.getPzxid());
         assertEquals(stat.getCtime(), stat.getMtime());
         assertEquals(0, stat.getCversion());
         assertEquals(0, stat.getVersion());
@@ -149,6 +153,7 @@
             zk.getData(name, false, stat);
     
             assertEquals(stat.getCzxid(), stat.getMzxid());
+            assertEquals(stat.getCzxid() + i + 1, stat.getPzxid());
             assertEquals(stat.getCtime(), stat.getMtime());
             assertEquals(i + 1, stat.getCversion());
             assertEquals(0, stat.getVersion());
@@ -173,6 +178,7 @@
         zk.getData(name, false, stat);
         
         assertEquals(stat.getCzxid(), stat.getMzxid());
+        assertEquals(stat.getCzxid(), stat.getPzxid());
         assertEquals(stat.getCtime(), stat.getMtime());
         assertEquals(0, stat.getCversion());
         assertEquals(0, stat.getVersion());
@@ -187,6 +193,7 @@
         zk.getData(name, false, stat);
         
         assertNotSame(stat.getCzxid(), stat.getMzxid());
+        assertEquals(stat.getCzxid(), stat.getPzxid());
         assertNotSame(stat.getCtime(), stat.getMtime());
         assertEquals(0, stat.getCversion());
         assertEquals(1, stat.getVersion());

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java Tue Oct 21 16:49:45 2008
@@ -21,8 +21,10 @@
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -30,6 +32,7 @@
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Test;
@@ -89,7 +92,6 @@
                 String name = zk.create("/tc-", "initialvalue".getBytes(),
                         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
                 names[i] = name;
-                System.out.println(name);
     
                 Stat stat = new Stat();
                 zk.getData(name, watcher, stat);
@@ -116,4 +118,167 @@
         }
     }
 
+    @Test
+    public void testWatcherAutoResetWithGlobal() throws Exception {
+        ZooKeeper zk = null;
+        MyWatcher watcher = new MyWatcher();
+        zk = createClient(watcher, hostPort);
+        testWatcherAutoReset(zk, watcher, watcher);
+        zk.close();
+    }
+        
+    @Test
+    public void testWatcherAutoResetWithLocal() throws Exception {
+        ZooKeeper zk = null;
+        MyWatcher watcher = new MyWatcher();
+        zk = createClient(watcher, hostPort);
+        testWatcherAutoReset(zk, watcher, new MyWatcher());
+        zk.close();
+    }
+        
+    @Test
+    public void testWatcherAutoResetDisabledWithGlobal() throws Exception {
+        ClientCnxn.disableAutoWatchReset = true;
+        testWatcherAutoResetWithGlobal();
+    }
+        
+    @Test
+    public void testWatcherAutoResetDisabledWithLocal() throws Exception {
+        ClientCnxn.disableAutoWatchReset = true;
+        testWatcherAutoResetWithLocal();
+    }
+        
+    private void testWatcherAutoReset(ZooKeeper zk, MyWatcher globalWatcher, 
+            MyWatcher localWatcher) throws Exception {
+        boolean isGlobal = (localWatcher == globalWatcher);
+        // First test to see if the watch survives across reconnects
+        zk.create("/watchtest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/watchtest/child", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        if (isGlobal) {
+            zk.getChildren("/watchtest", true);
+            zk.getData("/watchtest/child", true, new Stat());
+            zk.exists("/watchtest/child2", true);
+        } else {
+            zk.getChildren("/watchtest", localWatcher);
+            zk.getData("/watchtest/child", localWatcher, new Stat());
+            zk.exists("/watchtest/child2", localWatcher);
+        }
+        
+        assertTrue(localWatcher.events.isEmpty());
+        
+        stopServer();
+        globalWatcher.waitForDisconnected(3000);
+        localWatcher.waitForDisconnected(500);
+        startServer();
+        globalWatcher.waitForConnected(3000);
+        if (!isGlobal && !ClientCnxn.disableAutoWatchReset) {
+            localWatcher.waitForConnected(500);
+        }
+        
+        assertTrue(localWatcher.events.isEmpty());
+        zk.setData("/watchtest/child", new byte[1], -1);
+        zk.create("/watchtest/child2", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        
+        WatchedEvent e = localWatcher.events.poll(1, TimeUnit.MILLISECONDS);
+        if (!ClientCnxn.disableAutoWatchReset) {
+            assertEquals(e.getPath(), EventType.NodeDataChanged, e.getType());
+            assertEquals("/watchtest/child", e.getPath());
+        } else {
+            assertNull("unexpected event", e);
+        }
+    
+        e = localWatcher.events.poll(1000, TimeUnit.MILLISECONDS);
+        if (!ClientCnxn.disableAutoWatchReset) {
+            // The create will trigger the get children and the exist
+            // watches
+            assertEquals(EventType.NodeCreated, e.getType());
+            assertEquals("/watchtest/child2", e.getPath());
+        } else {
+            assertNull("unexpected event", e);
+        }
+
+        e = localWatcher.events.poll(1000, TimeUnit.MILLISECONDS);
+        if (!ClientCnxn.disableAutoWatchReset) {
+            assertEquals(EventType.NodeChildrenChanged, e.getType());
+            assertEquals("/watchtest", e.getPath());
+        } else {
+            assertNull("unexpected event", e);
+        }
+        
+        // Make sure PINGs don't screw us up!
+        Thread.sleep(4000);
+        
+        assertTrue(localWatcher.events.isEmpty()); // ensure no late arrivals
+        stopServer();
+        globalWatcher.waitForDisconnected(3000);
+        try {
+        try {
+            localWatcher.waitForDisconnected(500);
+            if (!isGlobal && !ClientCnxn.disableAutoWatchReset) {
+                fail("Got an event when I shouldn't have");
+            }
+        } catch(TimeoutException toe) {
+            if (ClientCnxn.disableAutoWatchReset) {
+                fail("Didn't get an event when I should have");
+            }
+            // Else what we are expecting since there are no outstanding watches
+        }
+        } catch (Exception e1) {
+            LOG.error("bad", e1);
+            throw new RuntimeException(e1);
+        }
+        startServer();
+        globalWatcher.waitForConnected(3000);
+        
+        if (isGlobal) {
+            zk.getChildren("/watchtest", true);
+            zk.getData("/watchtest/child", true, new Stat());
+            zk.exists("/watchtest/child2", true);
+        } else {
+            zk.getChildren("/watchtest", localWatcher);
+            zk.getData("/watchtest/child", localWatcher, new Stat());
+            zk.exists("/watchtest/child2", localWatcher);
+        }
+        
+        // Do trigger an event to make sure that we do not get
+        // it later
+        zk.delete("/watchtest/child2", -1);
+        
+        e = localWatcher.events.poll(1, TimeUnit.MILLISECONDS);
+        assertEquals(EventType.NodeDeleted, e.getType());
+        assertEquals("/watchtest/child2", e.getPath());
+        
+        e = localWatcher.events.poll(1, TimeUnit.MILLISECONDS);
+        assertEquals(EventType.NodeChildrenChanged, e.getType());
+        assertEquals("/watchtest", e.getPath());
+        
+        assertTrue(localWatcher.events.isEmpty());
+        
+        stopServer();
+        globalWatcher.waitForDisconnected(3000);
+        localWatcher.waitForDisconnected(500);
+        startServer();
+        globalWatcher.waitForConnected(3000);
+        if (!isGlobal && !ClientCnxn.disableAutoWatchReset) {
+            localWatcher.waitForConnected(500);
+        }
+        
+        zk.delete("/watchtest/child", -1);
+        zk.delete("/watchtest", -1);
+        
+        e = localWatcher.events.poll(1, TimeUnit.MILLISECONDS);
+        if (!ClientCnxn.disableAutoWatchReset) {
+            assertEquals(EventType.NodeDeleted, e.getType());
+            assertEquals("/watchtest/child", e.getPath());
+        } else {
+            assertNull("unexpected event", e);
+        }
+        
+        // Make sure nothing is straggling!
+        Thread.sleep(1000);
+        assertTrue(localWatcher.events.isEmpty());
+        
+    }
+
 }

Modified: hadoop/zookeeper/trunk/src/zookeeper.jute
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/zookeeper.jute?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/zookeeper.jute (original)
+++ hadoop/zookeeper/trunk/src/zookeeper.jute Tue Oct 21 16:49:45 2008
@@ -37,6 +37,7 @@
         long ephemeralOwner; // owner id if ephemeral, 0 otw
         int dataLength;  //length of the data in the node
         int numChildren; //number of children of this node
+        long pzxid;      // last modified children
     }
     // information explicitly stored by the server persistently
     class StatPersisted {
@@ -48,6 +49,7 @@
         int cversion;    // child version
         int aversion;    // acl version
         long ephemeralOwner; // owner id if ephemeral, 0 otw
+        long pzxid;      // last modified children
     }
 
    // information explicitly stored by the version 1 database of servers 
@@ -83,6 +85,12 @@
         long sessionId;
         buffer passwd;
     }
+    class SetWatches {
+        long relativeZxid;
+        vector<ustring>dataWatches;
+        vector<ustring>existWatches;
+        vector<ustring>childWatches;
+    }        
     class RequestHeader {
         int xid;
         int type;



Mime
View raw message