Author: mahadev
Date: Fri Feb 3 02:41:08 2012
New Revision: 1239983
URL: http://svn.apache.org/viewvc?rev=1239983&view=rev
Log:
ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after cluster restart.
(Benjamin Reed via mahadev)
Added:
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java
Modified:
zookeeper/branches/branch-3.3/CHANGES.txt
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Fri Feb 3 02:41:08 2012
@@ -11,6 +11,9 @@ BUGFIXES:
ZOOKEEPER-973. bind() could fail on Leader because it does not
setReuseAddress on its ServerSocket (Harsh J via phunt)
+
+ ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after
+ cluster restart. (Benjamin Reed via mahadev)
Release 3.3.4 - 2011-11-16
Backward compatible changes:
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
Fri Feb 3 02:41:08 2012
@@ -54,6 +54,7 @@ import org.apache.zookeeper.server.NIOSe
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.TxnHeader;
/**
* This Request processor actually applies any transaction associated with a
@@ -100,20 +101,10 @@ public class FinalRequestProcessor imple
}
}
if (request.hdr != null) {
- rc = zks.getZKDatabase().processTxn(request.hdr, request.txn);
- if (request.type == OpCode.createSession) {
- if (request.txn instanceof CreateSessionTxn) {
- CreateSessionTxn cst = (CreateSessionTxn) request.txn;
- zks.sessionTracker.addSession(request.sessionId, cst
- .getTimeOut());
- } else {
- LOG.warn("*****>>>>> Got "
- + request.txn.getClass() + " "
- + request.txn.toString());
- }
- } else if (request.type == OpCode.closeSession) {
- zks.sessionTracker.removeSession(request.sessionId);
- }
+ TxnHeader hdr = request.hdr;
+ Record txn = request.txn;
+
+ rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Fri Feb 3 02:41:08 2012
@@ -41,9 +41,12 @@ import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.SessionTracker.Session;
import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnHeader;
/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
@@ -93,7 +96,6 @@ public class ZooKeeperServer implements
protected int maxSessionTimeout = -1;
protected SessionTracker sessionTracker;
private FileTxnSnapLog txnLogFactory = null;
- private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
private ZKDatabase zkDb;
protected long hzxid = 0;
public final static Exception ok = new Exception("No prob");
@@ -240,8 +242,7 @@ public class ZooKeeperServer implements
// Clean up dead sessions
LinkedList<Long> deadSessions = new LinkedList<Long>();
for (long session : zkDb.getSessions()) {
- sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
- if (sessionsWithTimeouts.get(session) == null) {
+ if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
@@ -368,7 +369,10 @@ public class ZooKeeperServer implements
}
public void startup() {
- createSessionTracker();
+ if (sessionTracker == null) {
+ createSessionTracker();
+ }
+ startSessionTracker();
setupRequestProcessors();
registerJMX();
@@ -391,6 +395,9 @@ public class ZooKeeperServer implements
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1);
+ }
+
+ protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}
@@ -697,6 +704,27 @@ public class ZooKeeperServer implements
public String getState() {
return "standalone";
}
+
+ public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+ ProcessTxnResult rc;
+ int opCode = hdr.getType();
+ long sessionId = hdr.getClientId();
+ rc = getZKDatabase().processTxn(hdr, txn);
+ if (opCode == OpCode.createSession) {
+ if (txn instanceof CreateSessionTxn) {
+ CreateSessionTxn cst = (CreateSessionTxn) txn;
+ sessionTracker.addSession(sessionId, cst
+ .getTimeOut());
+ } else {
+ LOG.warn("*****>>>>> Got "
+ + txn.getClass() + " "
+ + txn.toString());
+ }
+ } else if (opCode == OpCode.closeSession) {
+ sessionTracker.removeSession(sessionId);
+ }
+ return rc;
+ }
public void dumpEphemerals(PrintWriter pwriter) {
zkDb.dumpEphemerals(pwriter);
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
Fri Feb 3 02:41:08 2012
@@ -77,9 +77,13 @@ public class LeaderZooKeeperServer exten
}
@Override
- protected void createSessionTracker() {
+ public void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
tickTime, self.getId());
+ }
+
+ @Override
+ protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Fri Feb 3 02:41:08 2012
@@ -321,6 +321,7 @@ public class Learner {
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+ zk.createSessionTracker();
if(LOG.isInfoEnabled()){
LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >>
32L));
}
@@ -351,7 +352,7 @@ public class Learner {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is "
+ pif.hdr.getZxid());
} else {
- zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+ zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
break;
@@ -360,7 +361,7 @@ public class Learner {
ia = BinaryInputArchive
.getArchive(new ByteArrayInputStream(qp.getData()));
Record txn = SerializeUtils.deserializeTxn(ia, hdr);
- zk.getZKDatabase().processTxn(hdr, txn);
+ zk.processTxn(hdr, txn);
break;
case Leader.UPTODATE:
zk.takeSnapshot();
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=1239983&r1=1239982&r2=1239983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
Fri Feb 3 02:41:08 2012
@@ -70,12 +70,15 @@ public abstract class LearnerZooKeeperSe
}
@Override
- protected void createSessionTracker() {
+ public void createSessionTracker() {
sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
self.getId());
}
@Override
+ protected void startSessionTracker() {}
+
+ @Override
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException, InterruptedException {
getLearner().validateSession(cnxn, sessionId, sessionTimeout);
Added: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java?rev=1239983&view=auto
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java
(added)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/SnapshotSessionTest.java
Fri Feb 3 02:41:08 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.After;
+import org.junit.Test;
+
+public class SnapshotSessionTest {
+ QuorumUtil qu = new QuorumUtil(1);
+ private static final String ZOOKEEPER_SNAP_COUNT = "zookeeper.snapCount";
+ String snapCount = System.getProperty(ZOOKEEPER_SNAP_COUNT);
+
+ public SnapshotSessionTest() {
+ System.setProperty(ZOOKEEPER_SNAP_COUNT, "10");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ qu.shutdownAll();
+ if (snapCount == null) {
+ System.clearProperty(ZOOKEEPER_SNAP_COUNT);
+ } else {
+ System.setProperty(ZOOKEEPER_SNAP_COUNT, snapCount);
+ }
+ }
+
+ class NullWatcher implements Watcher {
+ boolean connected;
+ @Override
+ synchronized public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected) {
+ connected = true;
+ notifyAll();
+ }
+ }
+ synchronized public void waitForConnected() throws InterruptedException {
+ while(!connected) {
+ wait();
+ }
+ }
+ };
+
+ /**
+ * This test makes sure that session events in DIFFs are applied to the snapshot
+ * if they have been committed by the leader before the diff is sent.
+ * (see ZOOKEEPER-1367)
+ */
+ @Test
+ public void testSessionInSnapshot() throws Exception {
+ qu.startAll();
+ int follower = 1;
+ if (qu.getPeer(1).peer.follower == null) {
+ follower = 2;
+ }
+ String hostPort = qu.getConnString();
+ /* we want to prime the peers to make sure that we have a good
+ * base for a diff when the follower disconnects and reconnnects
+ */
+ ZooKeeper zk = new ZooKeeper(hostPort, 3000, new NullWatcher());
+ zk.setData("/", "foo".getBytes(), -1);
+ zk.close();
+
+ qu.shutdown(follower);
+
+ /* while the follower is down create a session and generate a bit of
+ * traffic.
+ */
+ NullWatcher nullWatcher = new NullWatcher();
+ zk = new ZooKeeper(hostPort, 3000, nullWatcher);
+ nullWatcher.waitForConnected();
+ pumpRequests(zk, "/", 20);
+ qu.restart(follower);
+
+ /* make sure the session is there! */
+ Assert.assertNotNull("Session is not in snapshot!", qu.getPeer(follower).peer.follower.zk.getZKDatabase().getSessionWithTimeOuts().get(zk.getSessionId()));
+ }
+
+ private void pumpRequests(ZooKeeper zk, String path, int i) throws KeeperException, InterruptedException
{
+ while(i > 0) {
+ zk.setData(path, ("set"+i).getBytes(), -1);
+ i--;
+ }
+ }
+}
|