Author: mahadev
Date: Tue Jan 31 06:45:45 2012
New Revision: 1238176
URL: http://svn.apache.org/viewvc?rev=1238176&view=rev
Log:
ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after cluster restart.
(Benjamin Reed via mahadev)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1238176&r1=1238175&r2=1238176&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jan 31 06:45:45 2012
@@ -104,7 +104,10 @@ BUGFIXES:
ZOOKEEPER-973. bind() could fail on Leader because it does not
setReuseAddress on its ServerSocket (Harsh J via phunt)
-
+
+ ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after cluster restart.
+ (Benjamin Reed via mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1238176&r1=1238175&r2=1238176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Tue
Jan 31 06:45:45 2012
@@ -54,6 +54,7 @@ import org.apache.zookeeper.server.DataT
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.TxnHeader;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.OpResult.CheckResult;
@@ -107,20 +108,10 @@ public class FinalRequestProcessor imple
}
}
if (request.getHdr() != null) {
- rc = zks.getZKDatabase().processTxn(request.getHdr(), request.getTxn());
- if (request.type == OpCode.createSession) {
- if (request.getTxn() instanceof CreateSessionTxn) {
- CreateSessionTxn cst = (CreateSessionTxn) request.getTxn();
- zks.sessionTracker.addSession(request.sessionId, cst
- .getTimeOut());
- } else {
- LOG.warn("*****>>>>> Got "
- + request.getTxn().getClass() + " "
- + request.getTxn().toString());
- }
- } else if (request.type == OpCode.closeSession) {
- zks.sessionTracker.removeSession(request.sessionId);
- }
+ TxnHeader hdr = request.getHdr();
+ Record txn = request.getTxn();
+
+ rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1238176&r1=1238175&r2=1238176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Tue Jan
31 06:45:45 2012
@@ -53,6 +53,7 @@ import org.apache.zookeeper.proto.GetSAS
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetSASLResponse;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
import org.apache.zookeeper.server.SessionTracker.Session;
import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
@@ -60,6 +61,9 @@ import org.apache.zookeeper.server.auth.
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+
import javax.security.sasl.SaslException;
/**
@@ -87,7 +91,6 @@ public class ZooKeeperServer implements
protected int maxSessionTimeout = -1;
protected SessionTracker sessionTracker;
private FileTxnSnapLog txnLogFactory = null;
- private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
private ZKDatabase zkDb;
protected long hzxid = 0;
public final static Exception ok = new Exception("No prob");
@@ -228,8 +231,7 @@ public class ZooKeeperServer implements
// Clean up dead sessions
LinkedList<Long> deadSessions = new LinkedList<Long>();
for (Long session : zkDb.getSessions()) {
- sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
- if (sessionsWithTimeouts.get(session) == null) {
+ if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
@@ -357,7 +359,10 @@ public class ZooKeeperServer implements
}
public void startup() {
- createSessionTracker();
+ if (sessionTracker == null) {
+ createSessionTracker();
+ }
+ startSessionTracker();
setupRequestProcessors();
registerJMX();
@@ -380,6 +385,9 @@ public class ZooKeeperServer implements
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1);
+ }
+
+ protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}
@@ -918,4 +926,26 @@ public class ZooKeeperServer implements
// wrap SASL response token to client inside a Response object.
return new SetSASLResponse(responseToken);
}
+
+ public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+ ProcessTxnResult rc;
+ int opCode = hdr.getType();
+ long sessionId = hdr.getClientId();
+ rc = getZKDatabase().processTxn(hdr, txn);
+ if (opCode == OpCode.createSession) {
+ if (txn instanceof CreateSessionTxn) {
+ CreateSessionTxn cst = (CreateSessionTxn) txn;
+ sessionTracker.addSession(sessionId, cst
+ .getTimeOut());
+ } else {
+ LOG.warn("*****>>>>> Got "
+ + txn.getClass() + " "
+ + txn.toString());
+ }
+ } else if (opCode == OpCode.closeSession) {
+ sessionTracker.removeSession(sessionId);
+ }
+ return rc;
+ }
+
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=1238176&r1=1238175&r2=1238176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
Tue Jan 31 06:45:45 2012
@@ -74,9 +74,13 @@ public class LeaderZooKeeperServer exten
}
@Override
- protected void createSessionTracker() {
+ public void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
tickTime, self.getId());
+ }
+
+ @Override
+ protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1238176&r1=1238175&r2=1238176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Tue Jan
31 06:45:45 2012
@@ -352,7 +352,8 @@ public class Learner {
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
+ zk.createSessionTracker();
+
long lastQueued = 0;
// in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
@@ -383,7 +384,7 @@ public class Learner {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal
is " + pif.hdr.getZxid());
} else {
- zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+ zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
@@ -393,7 +394,7 @@ public class Learner {
case Leader.INFORM:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
- zk.getZKDatabase().processTxn(hdr, txn);
+ zk.processTxn(hdr, txn);
break;
case Leader.UPTODATE:
if (!snapshotTaken) { // true for the pre v1.0 case
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=1238176&r1=1238175&r2=1238176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
Tue Jan 31 06:45:45 2012
@@ -69,12 +69,15 @@ public abstract class LearnerZooKeeperSe
}
@Override
- protected void createSessionTracker() {
+ public void createSessionTracker() {
sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
self.getId());
}
@Override
+ protected void startSessionTracker() {}
+
+ @Override
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
getLearner().validateSession(cnxn, sessionId, sessionTimeout);
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java?rev=1238176&r1=1238175&r2=1238176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java Tue
Jan 31 06:45:45 2012
@@ -44,8 +44,8 @@ public class LearnerTest extends ZKTestC
Learner learner;
- public SimpleLearnerZooKeeperServer(FileTxnSnapLog ftsl) throws IOException {
- super(ftsl, 2000, 2000, 2000, new ZKDatabase(ftsl), null);
+ public SimpleLearnerZooKeeperServer(FileTxnSnapLog ftsl, QuorumPeer self) throws
IOException {
+ super(ftsl, 2000, 2000, 2000, new ZKDatabase(ftsl), self);
}
@Override
@@ -57,7 +57,7 @@ public class LearnerTest extends ZKTestC
static class SimpleLearner extends Learner {
SimpleLearner(FileTxnSnapLog ftsl) throws IOException {
self = new QuorumPeer();
- zk = new SimpleLearnerZooKeeperServer(ftsl);
+ zk = new SimpleLearnerZooKeeperServer(ftsl, self);
((SimpleLearnerZooKeeperServer) zk).learner = this;
}
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1238176&r1=1238175&r2=1238176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Tue Jan
31 06:45:45 2012
@@ -52,6 +52,7 @@ import org.apache.zookeeper.server.quoru
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.TxnHeader;
@@ -549,6 +550,110 @@ public class Zab1_0Test {
}
@Test
+ public void testNormalFollowerRunWithDiff() throws Exception {
+ testFollowerConversation(new FollowerConversation() {
+ @Override
+ public void converseWithFollower(InputArchive ia, OutputArchive oa,
+ Follower f) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
+ File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+ try {
+ Assert.assertEquals(0, f.self.getAcceptedEpoch());
+ Assert.assertEquals(0, f.self.getCurrentEpoch());
+
+ // Setup a database with a single /foo node
+ ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
+ final long firstZxid = ZxidUtils.makeZxid(1, 1);
+ zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create),
new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+ Stat stat = new Stat();
+ Assert.assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
+
+ QuorumPacket qp = new QuorumPacket();
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.FOLLOWERINFO, qp.getType());
+ Assert.assertEquals(qp.getZxid(), 0);
+ LearnerInfo learnInfo = new LearnerInfo();
+ ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()),
learnInfo);
+ Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+ Assert.assertEquals(learnInfo.getServerid(), 0);
+
+ // We are simulating an established leader, so the epoch is 1
+ qp.setType(Leader.LEADERINFO);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ byte protoBytes[] = new byte[4];
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
+ qp.setData(protoBytes);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
+ Assert.assertEquals(0, qp.getZxid());
+ Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
+ Assert.assertEquals(1, f.self.getAcceptedEpoch());
+ Assert.assertEquals(0, f.self.getCurrentEpoch());
+
+ // Send a diff
+ qp.setType(Leader.DIFF);
+ qp.setData(new byte[0]);
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+ oa.writeRecord(qp, null);
+ final long createSessionZxid = ZxidUtils.makeZxid(1, 2);
+ proposeNewSession(qp, createSessionZxid, 0x333);
+ oa.writeRecord(qp, null);
+ qp.setType(Leader.COMMIT);
+ qp.setZxid(createSessionZxid);
+ oa.writeRecord(qp, null);
+ qp.setType(Leader.NEWLEADER);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ oa.writeRecord(qp, null);
+ qp.setType(Leader.UPTODATE);
+ qp.setZxid(0);
+ oa.writeRecord(qp, null);
+
+ // Read the uptodate ack
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACK, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
+
+ // Get the ack of the new leader
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACK, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ Assert.assertEquals(1, f.self.getAcceptedEpoch());
+ Assert.assertEquals(1, f.self.getCurrentEpoch());
+
+ Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());
+
+ // Make sure the data was recorded in the filesystem ok
+ ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+ zkDb2.loadDataBase();
+ System.out.println(zkDb2.getSessions());
+ Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
+ } finally {
+ recursiveDelete(tmpDir);
+ }
+
+ }
+
+ private void proposeNewSession(QuorumPacket qp, long zxid, long sessionId) throws
IOException {
+ qp.setType(Leader.PROPOSAL);
+ qp.setZxid(zxid);
+ TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.createSession);
+ CreateSessionTxn cst = new CreateSessionTxn(30000);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ boa.writeRecord(hdr, null);
+ boa.writeRecord(cst, null);
+ qp.setData(baos.toByteArray());
+ }
+ });
+ }
+
+ @Test
public void testNormalRun() throws Exception {
testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
|