zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1239983 - in /zookeeper/branches/branch-3.3: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/
Date Fri, 03 Feb 2012 02:41:08 GMT
Author: mahadev
Date: Fri Feb  3 02:41:08 2012
New Revision: 1239983

URL: http://svn.apache.org/viewvc?rev=1239983&view=rev
Log:
ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after cluster restart.
(Benjamin Reed via mahadev)

Added:
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java
Modified:
    zookeeper/branches/branch-3.3/CHANGES.txt
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java

Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Fri Feb  3 02:41:08 2012
@@ -11,6 +11,9 @@ BUGFIXES:
  
   ZOOKEEPER-973. bind() could fail on Leader because it does not
   setReuseAddress on its ServerSocket (Harsh J via phunt)
+  
+  ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after 
+  cluster restart. (Benjamin Reed via mahadev)
 
 Release 3.3.4 - 2011-11-16
 Backward compatible changes:

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
Fri Feb  3 02:41:08 2012
@@ -54,6 +54,7 @@ import org.apache.zookeeper.server.NIOSe
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This Request processor actually applies any transaction associated with a
@@ -100,20 +101,10 @@ public class FinalRequestProcessor imple
                 }
             }
             if (request.hdr != null) {
-                rc = zks.getZKDatabase().processTxn(request.hdr, request.txn);
-                if (request.type == OpCode.createSession) {
-                    if (request.txn instanceof CreateSessionTxn) {
-                        CreateSessionTxn cst = (CreateSessionTxn) request.txn;
-                        zks.sessionTracker.addSession(request.sessionId, cst
-                                .getTimeOut());
-                    } else {
-                        LOG.warn("*****>>>>> Got "
-                                + request.txn.getClass() + " "
-                                + request.txn.toString());
-                    }
-                } else if (request.type == OpCode.closeSession) {
-                    zks.sessionTracker.removeSession(request.sessionId);
-                }
+               TxnHeader hdr = request.hdr;
+               Record txn = request.txn;
+
+               rc = zks.processTxn(hdr, txn);
             }
             // do not add non quorum packets to the queue.
             if (Request.isQuorum(request.type)) {

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Fri Feb  3 02:41:08 2012
@@ -41,9 +41,12 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.StatPersisted;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.SessionTracker.Session;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This class implements a simple standalone ZooKeeperServer. It sets up the
@@ -93,7 +96,6 @@ public class ZooKeeperServer implements 
     protected int maxSessionTimeout = -1;
     protected SessionTracker sessionTracker;
     private FileTxnSnapLog txnLogFactory = null;
-    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
     private ZKDatabase zkDb;
     protected long hzxid = 0;
     public final static Exception ok = new Exception("No prob");
@@ -240,8 +242,7 @@ public class ZooKeeperServer implements 
         // Clean up dead sessions
         LinkedList<Long> deadSessions = new LinkedList<Long>();
         for (long session : zkDb.getSessions()) {
-            sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
-            if (sessionsWithTimeouts.get(session) == null) {
+          if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                 deadSessions.add(session);
             }
         }
@@ -368,7 +369,10 @@ public class ZooKeeperServer implements 
     }
     
     public void startup() {        
-        createSessionTracker();
+        if (sessionTracker == null) {
+            createSessionTracker();
+        }
+        startSessionTracker();
         setupRequestProcessors();
 
         registerJMX();
@@ -391,6 +395,9 @@ public class ZooKeeperServer implements 
     protected void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                 tickTime, 1);
+    }
+    
+    protected void startSessionTracker() {
         ((SessionTrackerImpl)sessionTracker).start();
     }
 
@@ -697,6 +704,27 @@ public class ZooKeeperServer implements 
     public String getState() {
         return "standalone";
     }
+    
+    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+        ProcessTxnResult rc;
+        int opCode = hdr.getType();
+        long sessionId = hdr.getClientId();
+        rc = getZKDatabase().processTxn(hdr, txn);
+        if (opCode == OpCode.createSession) {
+            if (txn instanceof CreateSessionTxn) {
+                CreateSessionTxn cst = (CreateSessionTxn) txn;
+                sessionTracker.addSession(sessionId, cst
+                        .getTimeOut());
+            } else {
+                LOG.warn("*****>>>>> Got "
+                        + txn.getClass() + " "
+                        + txn.toString());
+            }
+        } else if (opCode == OpCode.closeSession) {
+            sessionTracker.removeSession(sessionId);
+        }
+        return rc;
+    }
 
     public void dumpEphemerals(PrintWriter pwriter) {
     	zkDb.dumpEphemerals(pwriter);

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
Fri Feb  3 02:41:08 2012
@@ -77,9 +77,13 @@ public class LeaderZooKeeperServer exten
     }
     
     @Override
-    protected void createSessionTracker() {
+    public void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
                 tickTime, self.getId());
+    }
+    
+    @Override
+    protected void startSessionTracker() {
         ((SessionTrackerImpl)sessionTracker).start();
     }
 

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Fri Feb  3 02:41:08 2012
@@ -321,6 +321,7 @@ public class Learner {       
 
             }
             zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+            zk.createSessionTracker();
             if(LOG.isInfoEnabled()){
                 LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >>
32L));
             }
@@ -351,7 +352,7 @@ public class Learner {       
                     if (pif.hdr.getZxid() != qp.getZxid()) {
                         LOG.warn("Committing " + qp.getZxid() + ", but next proposal is "
+ pif.hdr.getZxid());
                     } else {
-                        zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+                        zk.processTxn(pif.hdr, pif.rec);
                         packetsNotCommitted.remove();
                     }
                     break;
@@ -360,7 +361,7 @@ public class Learner {       
                     ia = BinaryInputArchive
                             .getArchive(new ByteArrayInputStream(qp.getData()));
                     Record txn = SerializeUtils.deserializeTxn(ia, hdr);
-                    zk.getZKDatabase().processTxn(hdr, txn);
+                    zk.processTxn(hdr, txn);
                     break;
                 case Leader.UPTODATE:
                     zk.takeSnapshot();

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
Fri Feb  3 02:41:08 2012
@@ -70,12 +70,15 @@ public abstract class LearnerZooKeeperSe
     }    
     
     @Override
-    protected void createSessionTracker() {
+    public void createSessionTracker() {
         sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
                 self.getId());
     }
     
     @Override
+    protected void startSessionTracker() {}
+    
+    @Override
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
             int sessionTimeout) throws IOException, InterruptedException {
         getLearner().validateSession(cnxn, sessionId, sessionTimeout);

Added: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java?rev=1239983&view=auto
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java
(added)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java
Fri Feb  3 02:41:08 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.After;
+import org.junit.Test;
+
+public class SnapshotSessionTest {
+    QuorumUtil qu = new QuorumUtil(1);
+    private static final String ZOOKEEPER_SNAP_COUNT = "zookeeper.snapCount";
+    String snapCount = System.getProperty(ZOOKEEPER_SNAP_COUNT);
+    
+    public SnapshotSessionTest() {
+        System.setProperty(ZOOKEEPER_SNAP_COUNT, "10");
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        qu.shutdownAll();
+        if (snapCount == null) {
+            System.clearProperty(ZOOKEEPER_SNAP_COUNT);
+        } else {
+            System.setProperty(ZOOKEEPER_SNAP_COUNT, snapCount);
+        }
+    }
+    
+    class NullWatcher implements Watcher {
+        boolean connected;
+        @Override
+        synchronized public void process(WatchedEvent event) {
+            if (event.getState() == KeeperState.SyncConnected) {
+                connected = true;
+                notifyAll();
+            }
+        }
+        synchronized public void waitForConnected() throws InterruptedException {
+            while(!connected) {
+                wait();
+            }
+        }
+    };
+    
+    /**
+     * This test makes sure that session events in DIFFs are applied to the snapshot
+     * if they have been committed by the leader before the diff is sent.
+     * (see ZOOKEEPER-1367)
+     */
+    @Test
+    public void testSessionInSnapshot() throws Exception {
+        qu.startAll();
+        int follower = 1;
+        if (qu.getPeer(1).peer.follower == null) {
+            follower = 2;
+        }
+        String hostPort = qu.getConnString();
+        /* we want to prime the peers to make sure that we have a good
+         * base for a diff when the follower disconnects and reconnnects
+         */
+        ZooKeeper zk = new ZooKeeper(hostPort, 3000, new NullWatcher());
+        zk.setData("/", "foo".getBytes(), -1);
+        zk.close();
+        
+        qu.shutdown(follower);
+        
+        /* while the follower is down create a session and generate a bit of
+         * traffic.
+         */
+        NullWatcher nullWatcher = new NullWatcher();
+        zk = new ZooKeeper(hostPort, 3000, nullWatcher);
+        nullWatcher.waitForConnected();
+        pumpRequests(zk, "/", 20);
+        qu.restart(follower);
+        
+        /* make sure the session is there! */
+        Assert.assertNotNull("Session is not in snapshot!", qu.getPeer(follower).peer.follower.zk.getZKDatabase().getSessionWithTimeOuts().get(zk.getSessionId()));
+    }
+
+    private void pumpRequests(ZooKeeper zk, String path, int i) throws KeeperException, InterruptedException
{
+        while(i > 0) {
+            zk.setData(path, ("set"+i).getBytes(), -1);
+            i--;
+        }
+    }
+}



Mime
View raw message