Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@locus.apache.org Received: (qmail 92001 invoked from network); 24 Sep 2008 21:05:56 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Sep 2008 21:05:55 -0000 Received: (qmail 32390 invoked by uid 500); 24 Sep 2008 21:05:53 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 32374 invoked by uid 500); 24 Sep 2008 21:05:53 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 32357 invoked by uid 99); 24 Sep 2008 21:05:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2008 14:05:53 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2008 21:04:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D31BC23888A0; Wed, 24 Sep 2008 14:05:26 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r698734 - in /hadoop/zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/ClientCnxn.java src/java/main/org/apache/zookeeper/ZooKeeper.java Date: Wed, 24 Sep 2008 21:05:26 -0000 To: zookeeper-commits@hadoop.apache.org From: breed@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080924210526.D31BC23888A0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: breed Date: Wed Sep 24 14:05:26 2008 New Revision: 698734 URL: http://svn.apache.org/viewvc?rev=698734&view=rev Log: ZOOKEEPER-137 client watcher objects can lose events Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=698734&r1=698733&r2=698734&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Wed Sep 24 14:05:26 2008 @@ -64,3 +64,5 @@ ZOOKEEPER-131. Fix Old leader election can elect a dead leader over and over again. (breed via mahadev) + + ZOOKEEPER-137. client watcher objects can lose events (Patrick Hunt via breed) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=698734&r1=698733&r2=698734&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Wed Sep 24 14:05:26 2008 @@ -94,9 +94,6 @@ */ private LinkedList pendingQueue = new LinkedList(); - private LinkedBlockingQueue waitingEvents = - new LinkedBlockingQueue(); - /** * These are the packets that need to be sent. */ @@ -112,7 +109,7 @@ private final ZooKeeper zooKeeper; - private final Watcher watcher; + private final ClientWatchManager watcher; private long sessionId; @@ -206,7 +203,7 @@ } public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper, - Watcher watcher) + ClientWatchManager watcher) throws IOException { this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]); @@ -226,7 +223,7 @@ * @throws IOException */ public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper, - Watcher watcher, long sessionId, byte[] sessionPasswd) + ClientWatchManager watcher, long sessionId, byte[] sessionPasswd) throws IOException { this.zooKeeper = zooKeeper; @@ -251,11 +248,11 @@ readTimeout = sessionTimeout * 2 / 3; Collections.shuffle(serverAddrs); sendThread = new SendThread(); - sendThread.start(); eventThread = new EventThread(); + sendThread.start(); eventThread.start(); } - + WatcherEvent eventOfDeath = new WatcherEvent(); final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() { @@ -264,12 +261,43 @@ } }; + private class WatcherSetEventPair { + private final Set watchers; + private final WatcherEvent event; + + public WatcherSetEventPair(Set watchers, WatcherEvent event) { + this.watchers = watchers; + this.event = event; + } + } + class EventThread extends Thread { + private final LinkedBlockingQueue waitingEvents = + new LinkedBlockingQueue(); + EventThread() { super(currentThread().getName() + "-EventThread"); setUncaughtExceptionHandler(uncaughtExceptionHandler); setDaemon(true); } + + public void queueEvent(WatcherEvent event) { + // materialize the watchers based on the event + WatcherSetEventPair pair = new WatcherSetEventPair( + watcher.materialize(event.getState(), event.getType(), + event.getPath()), + event); + // queue the pair (watch set & event) for later processing + waitingEvents.add(pair); + } + + public void queuePacket(Packet packet) { + waitingEvents.add(packet); + } + + public void queueEventOfDeath() { + waitingEvents.add(eventOfDeath); + } @Override public void run() { @@ -279,8 +307,12 @@ if (event == eventOfDeath) { break; } - if (event instanceof WatcherEvent) { - watcher.process((WatcherEvent) event); + if (event instanceof WatcherSetEventPair) { + // each watcher will process the event + WatcherSetEventPair pair = (WatcherSetEventPair)event; + for (Watcher watcher: pair.watchers) { + watcher.process(pair.event); + } } else { Packet p = (Packet) event; int rc = 0; @@ -362,7 +394,6 @@ } } - @SuppressWarnings("unchecked") private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); @@ -375,7 +406,7 @@ } } else { p.finished = true; - waitingEvents.add(p); + eventThread.queuePacket(p); } } @@ -428,7 +459,7 @@ int sessionTimeout = conRsp.getTimeOut(); if (sessionTimeout <= 0) { zooKeeper.state = States.CLOSED; - waitingEvents.add(new WatcherEvent(Watcher.Event.EventNone, + eventThread.queueEvent(new WatcherEvent(Watcher.Event.EventNone, Watcher.Event.KeeperStateExpired, null)); throw new IOException("Session Expired"); } @@ -436,11 +467,10 @@ connectTimeout = sessionTimeout / serverAddrs.size(); sessionId = conRsp.getSessionId(); sessionPasswd = conRsp.getPasswd(); - waitingEvents.add(new WatcherEvent(Watcher.Event.EventNone, + eventThread.queueEvent(new WatcherEvent(Watcher.Event.EventNone, Watcher.Event.KeeperStateSyncConnected, null)); } - @SuppressWarnings("unchecked") void readResponse() throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); @@ -461,9 +491,13 @@ // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); - // System.out.println("Got an event: " + event + " for " + - // sessionId + " through" + _cnxn); - waitingEvents.add(event); + + if (LOG.isDebugEnabled()) { + LOG.debug("Got an event: " + event + " for sessionid 0x" + + Long.toHexString(sessionId)); + } + + eventThread.queueEvent(event); return; } if (pendingQueue.size() == 0) { @@ -763,8 +797,10 @@ e); cleanup(); if (zooKeeper.state.isAlive()) { - waitingEvents.add(new WatcherEvent(Event.EventNone, - Event.KeeperStateDisconnected, null)); + eventThread.queueEvent(new WatcherEvent( + Event.EventNone, + Event.KeeperStateDisconnected, + null)); } now = System.currentTimeMillis(); @@ -842,13 +878,12 @@ * method is primarily here to allow the tests to verify disconnection * behavior. */ - @SuppressWarnings("unchecked") public void disconnect() { LOG.info("Disconnecting ClientCnxn for session: 0x" + Long.toHexString(getSessionId())); sendThread.close(); - waitingEvents.add(eventOfDeath); + eventThread.queueEventOfDeath(); } /** Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=698734&r1=698733&r2=698734&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Wed Sep 24 14:05:26 2008 @@ -106,37 +106,35 @@ public class ZooKeeper { private static final Logger LOG = Logger.getLogger(ZooKeeper.class); - private volatile Watcher defaultWatcher; - - private final Map> dataWatches = - new HashMap>(); - private final Map> childWatches = - new HashMap>(); - + private final ZKWatchManager watchManager = new ZKWatchManager(); + /** - * Process watch events generated by the ClientCnxn object. + * Manage watchers & handle events generated by the ClientCnxn object. * * We are implementing this as a nested class of ZooKeeper so that - * the public Watcher.process(event) method will not be exposed as part - * of the ZooKeeper client API. + * the public methods will not be exposed as part of the ZooKeeper client + * API. */ - private class ZKWatcher implements Watcher { - /** - * Process a WatchEvent. - * - * Looks up the watch in the set of watches, processes the event - * if found, otw uses the default watcher (registered during instance - * creation) to process the watch. - * - * @param event the event to process. + private class ZKWatchManager implements ClientWatchManager { + private final Map> dataWatches = + new HashMap>(); + private final Map> childWatches = + new HashMap>(); + + private volatile Watcher defaultWatcher; + + /* (non-Javadoc) + * @see org.apache.zookeeper.ClientWatchManager#materialize(int, int, java.lang.String) */ - public void process(WatcherEvent event) { + public Set materialize(int state, int type, String path) { + Set result = new HashSet(); + // clear the watches if we are not connected - if (event.getState() != Watcher.Event.KeeperStateSyncConnected) { + if (state != Watcher.Event.KeeperStateSyncConnected) { synchronized (dataWatches) { for (Set watchers : dataWatches.values()) { for (Watcher watcher : watchers) { - watcher.process(event); + result.add(watcher); } } dataWatches.clear(); @@ -144,7 +142,7 @@ synchronized (childWatches) { for (Set watchers : childWatches.values()) { for (Watcher watcher : watchers) { - watcher.process(event); + result.add(watcher); } } childWatches.clear(); @@ -153,28 +151,28 @@ Set watchers = null; - switch (event.getType()) { + switch (type) { case Watcher.Event.EventNone: - defaultWatcher.process(event); - return; + result.add(defaultWatcher); + return result; case Watcher.Event.EventNodeDataChanged: case Watcher.Event.EventNodeCreated: synchronized (dataWatches) { - watchers = dataWatches.remove(event.getPath()); + watchers = dataWatches.remove(path); } break; case Watcher.Event.EventNodeChildrenChanged: synchronized (childWatches) { - watchers = childWatches.remove(event.getPath()); + watchers = childWatches.remove(path); } break; case Watcher.Event.EventNodeDeleted: synchronized (dataWatches) { - watchers = dataWatches.remove(event.getPath()); + watchers = dataWatches.remove(path); } Set cwatches; synchronized (childWatches) { - cwatches = childWatches.remove(event.getPath()); + cwatches = childWatches.remove(path); } if (cwatches != null) { if (watchers == null) { @@ -185,16 +183,14 @@ } break; default: - String msg = "Unhandled watch event type " + event.getType(); + String msg = "Unhandled watch event type " + type + + " with state " + state + " on path " + path; LOG.error(msg); throw new RuntimeException(msg); } - if (watchers != null) { - for (Watcher watcher : watchers) { - watcher.process(event); - } - } + result.addAll(watchers); + return result; } } @@ -270,14 +266,14 @@ public ZooKeeper(String host, int sessionTimeout, Watcher watcher) throws IOException { - this.defaultWatcher = watcher; - cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher()); + watchManager.defaultWatcher = watcher; + cnxn = new ClientCnxn(host, sessionTimeout, this, watchManager); } public ZooKeeper(String host, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) throws IOException { - this.defaultWatcher = watcher; - cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher(), + watchManager.defaultWatcher = watcher; + cnxn = new ClientCnxn(host, sessionTimeout, this, watchManager, sessionId, sessionPasswd); } @@ -301,7 +297,7 @@ } public synchronized void register(Watcher watcher) { - this.defaultWatcher = watcher; + watchManager.defaultWatcher = watcher; } /** @@ -503,7 +499,8 @@ SetDataResponse response = new SetDataResponse(); WatchRegistration wcb = null; if (watcher != null) { - wcb = new ExistsWatchRegistration(dataWatches, watcher, path); + wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher, + path); } ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { @@ -537,7 +534,7 @@ public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { - return exists(path, watch ? defaultWatcher : null); + return exists(path, watch ? watchManager.defaultWatcher : null); } /** @@ -557,7 +554,8 @@ SetDataResponse response = new SetDataResponse(); WatchRegistration wcb = null; if (watcher != null) { - wcb = new ExistsWatchRegistration(dataWatches, watcher, path); + wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher, + path); } cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path, ctx, wcb); @@ -570,7 +568,7 @@ * @see #exists(String, boolean) */ public void exists(String path, boolean watch, StatCallback cb, Object ctx) { - exists(path, watch ? defaultWatcher : null, cb, ctx); + exists(path, watch ? watchManager.defaultWatcher : null, cb, ctx); } /** @@ -601,7 +599,8 @@ GetDataResponse response = new GetDataResponse(); WatchRegistration wcb = null; if (watcher != null) { - wcb = new WatchRegistration(dataWatches, watcher, path); + wcb = new WatchRegistration(watchManager.dataWatches, watcher, + path); } ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { @@ -633,7 +632,7 @@ */ public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException { - return getData(path, watch ? defaultWatcher : null, stat); + return getData(path, watch ? watchManager.defaultWatcher : null, stat); } /** @@ -651,7 +650,8 @@ GetDataResponse response = new GetDataResponse(); WatchRegistration wcb = null; if (watcher != null) { - wcb = new WatchRegistration(dataWatches, watcher, path); + wcb = new WatchRegistration(watchManager.dataWatches, watcher, + path); } cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path, ctx, wcb); @@ -664,7 +664,7 @@ * @see #getData(String, boolean, Stat) */ public void getData(String path, boolean watch, DataCallback cb, Object ctx) { - getData(path, watch ? defaultWatcher : null, cb, ctx); + getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx); } /** @@ -862,7 +862,8 @@ GetChildrenResponse response = new GetChildrenResponse(); WatchRegistration wcb = null; if (watcher != null) { - wcb = new WatchRegistration(childWatches, watcher, path); + wcb = new WatchRegistration(watchManager.childWatches, watcher, + path); } ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { @@ -893,7 +894,7 @@ */ public List getChildren(String path, boolean watch) throws KeeperException, InterruptedException { - return getChildren(path, watch ? defaultWatcher : null); + return getChildren(path, watch ? watchManager.defaultWatcher : null); } /** @@ -912,7 +913,8 @@ GetChildrenResponse response = new GetChildrenResponse(); WatchRegistration wcb = null; if (watcher != null) { - wcb = new WatchRegistration(childWatches, watcher, path); + wcb = new WatchRegistration(watchManager.childWatches, watcher, + path); } cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path, ctx, wcb); @@ -926,7 +928,7 @@ */ public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx) { - getChildren(path, watch ? defaultWatcher : null, cb, ctx); + getChildren(path, watch ? watchManager.defaultWatcher : null, cb, ctx); } /**