Return-Path: Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: (qmail 77221 invoked from network); 22 Jan 2011 19:39:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Jan 2011 19:39:21 -0000 Received: (qmail 57985 invoked by uid 500); 22 Jan 2011 19:39:21 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 57941 invoked by uid 500); 22 Jan 2011 19:39:20 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 57933 invoked by uid 99); 22 Jan 2011 19:39:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Jan 2011 19:39:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Jan 2011 19:39:18 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4E0B223889EA; Sat, 22 Jan 2011 19:38:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1062244 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/ Date: Sat, 22 Jan 2011 19:38:58 -0000 To: commits@zookeeper.apache.org From: breed@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110122193858.4E0B223889EA@eris.apache.org> Author: breed Date: Sat Jan 22 19:38:57 2011 New Revision: 1062244 URL: http://svn.apache.org/viewvc?rev=1062244&view=rev Log: ZOOKEEPER-962. leader/follower coherence issue when follower is receiving a DIFF Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Modified: zookeeper/trunk/CHANGES.txt zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Modified: zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1062244&r1=1062243&r2=1062244&view=diff ============================================================================== --- zookeeper/trunk/CHANGES.txt (original) +++ zookeeper/trunk/CHANGES.txt Sat Jan 22 19:38:57 2011 @@ -168,6 +168,8 @@ BUGFIXES: ZOOKEEPER-882. Startup loads last transaction from snapshot (j:ared via fpj) + ZOOKEEPER-962. leader/follower coherence issue when follower is receiving a DIFF (camille fournier via breed) + IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information (phunt via mahadev) Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1062244&r1=1062243&r2=1062244&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Sat Jan 22 19:38:57 2011 @@ -26,6 +26,9 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; @@ -33,16 +36,16 @@ import org.apache.jute.OutputArchive; import org.apache.jute.Record; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener; import org.apache.zookeeper.server.quorum.Leader; -import org.apache.zookeeper.server.quorum.QuorumPacket; import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.server.quorum.QuorumPacket; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.txn.TxnHeader; @@ -67,6 +70,7 @@ public class ZKDatabase { public static final int commitLogCount = 500; protected static int commitLogBuffer = 700; protected LinkedList committedLog = new LinkedList(); + protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock(); volatile private boolean initialized = false; /** @@ -104,8 +108,12 @@ public class ZKDatabase { */ dataTree = new DataTree(); sessionsWithTimeouts.clear(); - synchronized (committedLog) { + WriteLock lock = logLock.writeLock(); + try { + lock.lock(); committedLog.clear(); + } finally { + lock.unlock(); } initialized = false; } @@ -136,13 +144,30 @@ public class ZKDatabase { public long getminCommittedLog() { return minCommittedLog; } - - public LinkedList getCommittedLog() { - synchronized (this.committedLog) { - return new LinkedList(this.committedLog); - } + /** + * Get the lock that controls the committedLog. If you want to get the pointer to the committedLog, you need + * to use this lock to acquire a read lock before calling getCommittedLog() + * @return the lock that controls the committed log + */ + public ReentrantReadWriteLock getLogLock() { + return logLock; } + + public synchronized LinkedList getCommittedLog() { + ReadLock rl = logLock.readLock(); + // only make a copy if this thread isn't already holding a lock + if(logLock.getReadHoldCount() <=0) { + try { + rl.lock(); + return new LinkedList(this.committedLog); + } finally { + rl.unlock(); + } + } + return this.committedLog; + } + /** * get the last processed zxid from a datatree * @return the last processed zxid of a datatree @@ -206,7 +231,9 @@ public class ZKDatabase { * @param request committed request */ public void addCommittedProposal(Request request) { - synchronized (committedLog) { + WriteLock wl = logLock.writeLock(); + try { + wl.lock(); if (committedLog.size() > commitLogCount) { committedLog.removeFirst(); minCommittedLog = committedLog.getFirst().packet.getZxid(); @@ -234,6 +261,8 @@ public class ZKDatabase { p.request = request; committedLog.add(p); maxCommittedLog = p.packet.getZxid(); + } finally { + wl.unlock(); } } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1062244&r1=1062243&r2=1062244&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Sat Jan 22 19:38:57 2011 @@ -46,6 +46,7 @@ public class CommitProcessor extends Thr LinkedList committedRequests = new LinkedList(); RequestProcessor nextProcessor; + ArrayList toProcess = new ArrayList(); /** * This flag indicates whether we need to wait for a response to come back from the @@ -65,8 +66,7 @@ public class CommitProcessor extends Thr @Override public void run() { try { - Request nextPending = null; - ArrayList toProcess = new ArrayList(); + Request nextPending = null; while (!finished) { int len = toProcess.size(); for (int i = 0; i < len; i++) { Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1062244&r1=1062243&r2=1062244&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Sat Jan 22 19:38:57 2011 @@ -125,8 +125,7 @@ public class Follower extends Learner{ fzk.commit(qp.getZxid()); break; case Leader.UPTODATE: - fzk.takeSnapshot(); - self.cnxnFactory.setZooKeeperServer(fzk); + LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1062244&r1=1062243&r2=1062244&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Sat Jan 22 19:38:57 2011 @@ -28,6 +28,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -35,11 +36,14 @@ import org.apache.jute.BinaryInputArchiv import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; +import org.apache.jute.Record; import org.apache.log4j.Logger; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.txn.TxnHeader; /** * This class is the superclass of two of the three main actors in a ZK @@ -47,6 +51,10 @@ import org.apache.zookeeper.server.quoru * a good deal of code which is moved into Peer to avoid duplication. */ public class Learner { + static class PacketInFlight { + TxnHeader hdr; + Record rec; + } QuorumPeer self; LearnerZooKeeperServer zk; @@ -275,7 +283,8 @@ public class Learner { QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); - readPacket(qp); + readPacket(qp); + LinkedList packetsNotCommitted = new LinkedList(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid())); @@ -289,7 +298,7 @@ public class Learner { String signature = leaderIs.readString("signature"); if (!signature.equals("BenWasHere")) { LOG.error("Missing signature. Got " + signature); - throw new IOException("Missing signature"); + throw new IOException("Missing signature"); } } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader @@ -310,15 +319,63 @@ public class Learner { System.exit(13); } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); if(LOG.isInfoEnabled()){ LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >> 32L)); } - zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid); + + long lastQueued = 0; + // we are now going to start getting transactions to apply followed by an UPTODATE + outerLoop: + while (self.isRunning()) { + readPacket(qp); + switch(qp.getType()) { + case Leader.PROPOSAL: + PacketInFlight pif = new PacketInFlight(); + pif.hdr = new TxnHeader(); + BinaryInputArchive ia = BinaryInputArchive + .getArchive(new ByteArrayInputStream(qp.getData())); + pif.rec = SerializeUtils.deserializeTxn(ia, pif.hdr); + if (pif.hdr. getZxid() != lastQueued + 1) { + LOG.warn("Got zxid 0x" + + Long.toHexString(pif.hdr.getZxid()) + + " expected 0x" + + Long.toHexString(lastQueued + 1)); + } + lastQueued = pif.hdr.getZxid(); + packetsNotCommitted.add(pif); + break; + case Leader.COMMIT: + pif = packetsNotCommitted.peekFirst(); + if (pif.hdr.getZxid() != qp.getZxid()) { + LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); + } else { + zk.getZKDatabase().processTxn(pif.hdr, pif.rec); + packetsNotCommitted.remove(); + } + break; + case Leader.INFORM: + TxnHeader hdr = new TxnHeader(); + ia = BinaryInputArchive + .getArchive(new ByteArrayInputStream(qp.getData())); + Record txn = SerializeUtils.deserializeTxn(ia, hdr); + zk.getZKDatabase().processTxn(hdr, txn); + break; + case Leader.UPTODATE: + zk.takeSnapshot(); + self.cnxnFactory.setZooKeeperServer(zk); + break outerLoop; + } + } } ack.setZxid(newLeaderZxid & ~0xffffffffL); writePacket(ack, true); sock.setSoTimeout(self.tickTime * self.syncLimit); zk.startup(); + //We have to have a commit processor to do this + for(PacketInFlight p: packetsNotCommitted) { + ((FollowerZooKeeperServer)zk).logRequest(p.hdr, p.rec); + } } protected void revalidate(QuorumPacket qp) throws IOException { Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1062244&r1=1062243&r2=1062244&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sat Jan 22 19:38:57 2011 @@ -29,6 +29,8 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; @@ -254,79 +256,72 @@ public class LearnerHandler extends Thre long peerLastZxid = qp.getZxid(); /* the default to send to the follower */ int packetToSend = Leader.SNAP; - boolean logTxns = true; long zxidToSend = 0; - + long leaderLastZxid = 0; /** the packets that the follower needs to get updates from **/ long updates = peerLastZxid; /* we are sending the diff check if we have proposals in memory to be able to * send a diff to the */ - LinkedList proposals = leader.zk.getZKDatabase().getCommittedLog(); - synchronized(proposals) { + ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock(); + ReadLock rl = lock.readLock(); + try { + rl.lock(); + final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog(); + final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog(); + LinkedList proposals = leader.zk.getZKDatabase().getCommittedLog(); if (proposals.size() != 0) { - if ((leader.zk.getZKDatabase().getmaxCommittedLog() >= peerLastZxid) - && (leader.zk.getZKDatabase().getminCommittedLog() <= peerLastZxid)) { + if ((maxCommittedLog >= peerLastZxid) + && (minCommittedLog <= peerLastZxid)) { packetToSend = Leader.DIFF; - zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog(); + zxidToSend = maxCommittedLog; for (Proposal propose: proposals) { if (propose.packet.getZxid() > peerLastZxid) { queuePacket(propose.packet); QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), null, null); queuePacket(qcommit); - } } + } else if (peerLastZxid > maxCommittedLog) { + packetToSend = Leader.TRUNC; + zxidToSend = maxCommittedLog; + updates = zxidToSend; } + } else { + // just let the state transfer happen + } + + leaderLastZxid = leader.startForwarding(this, updates); + if (peerLastZxid == leaderLastZxid) { + // We are in sync so we'll do an empty diff + packetToSend = Leader.DIFF; + zxidToSend = leaderLastZxid; } - else { - logTxns = false; - } - } - - //check if we decided to send a diff or we need to send a truncate - // we avoid using epochs for truncating because epochs make things - // complicated. Two epochs might have the last 32 bits as same. - // only if we know that there is a committed zxid in the queue that - // is less than the one the peer has we send a trunc else to make - // things simple we just send sanpshot. - if (logTxns && (peerLastZxid > leader.zk.getZKDatabase().getmaxCommittedLog())) { - // this is the only case that we are sure that - // we can ask the peer to truncate the log - packetToSend = Leader.TRUNC; - zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog(); - updates = zxidToSend; - } - - /* see what other packets from the proposal - * and tobeapplied queues need to be sent - * and then decide if we can just send a DIFF - * or we actually need to send the whole snapshot - */ - long leaderLastZxid = leader.startForwarding(this, updates); - // a special case when both the ids are the same - if (peerLastZxid == leaderLastZxid) { - packetToSend = Leader.DIFF; - zxidToSend = leaderLastZxid; + } finally { + rl.unlock(); } QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, leaderLastZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); bufferedOutput.flush(); - - + //Need to set the zxidToSend to the latest zxid + if (packetToSend == Leader.SNAP) { + zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); + } oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); bufferedOutput.flush(); /* if we are not truncating or sending a diff just send a snapshot */ if (packetToSend == Leader.SNAP) { - LOG.warn("Sending snapshot last zxid of peer is 0x" + LOG.info("Sending snapshot last zxid of peer is 0x" + Long.toHexString(peerLastZxid) + " " + " zxid of leader is 0x" - + Long.toHexString(leaderLastZxid)); + + Long.toHexString(leaderLastZxid) + + "sent zxid of db as 0x" + + Long.toHexString(zxidToSend)); // Dump data to peer leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); @@ -524,6 +519,6 @@ public class LearnerHandler extends Thre public boolean synced() { return isAlive() - && tickOfLastAck >= leader.self.tick - leader.self.syncLimit; + && tickOfLastAck >= leader.self.tick - leader.self.syncLimit; } } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java?rev=1062244&r1=1062243&r2=1062244&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java Sat Jan 22 19:38:57 2011 @@ -111,8 +111,7 @@ public class Observer extends Learner{ LOG.warn("Ignoring commit"); break; case Leader.UPTODATE: - zk.takeSnapshot(); - self.cnxnFactory.setZooKeeperServer(zk); + LOG.error("Received an UPTODATE message after Observer started"); break; case Leader.REVALIDATE: revalidate(qp); Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1062244&view=auto ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (added) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Sat Jan 22 19:38:57 2011 @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.Test; + + +public class FollowerResyncConcurrencyTest extends QuorumBase { + volatile int counter = 0; + volatile int errors = 0; + + private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class); + public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; + + + /** + * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this, + * setting the ZXID of the SNAP packet + * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down + * The non-leader ZKs are writing to cluster + * Shut down F1 again + * Restart after sessions are expired, expect to get a snap file + * Shut down, run some transactions through. + * Restart to a diff while transactions are running in leader + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test + public void testResyncBySnapThenDiffAfterFollowerCrashes () + throws IOException, InterruptedException, KeeperException, Throwable{ + final Semaphore sem = new Semaphore(0); + + QuorumUtil qu = new QuorumUtil(1); + qu.startAll(); + CountdownWatcher watcher1 = new CountdownWatcher(); + CountdownWatcher watcher2 = new CountdownWatcher(); + CountdownWatcher watcher3 = new CountdownWatcher(); + + int index = 1; + while(qu.getPeer(index).peer.leader == null) + index++; + + Leader leader = qu.getPeer(index).peer.leader; + + assertNotNull(leader); + /* + * Reusing the index variable to select a follower to connect to + */ + index = (index == 1) ? 2 : 1; + qu.shutdown(index); + final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000,watcher3); + watcher3.waitForConnected(CONNECTION_TIMEOUT); + zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + + qu.restart(index); + ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1); + + ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher2); + + watcher1.waitForConnected(CONNECTION_TIMEOUT); + watcher2.waitForConnected(CONNECTION_TIMEOUT); + + zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + for(int i = 0; i < 1000; i++) { + zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + counter++; + if (rc != 0) { + errors++; + } + if(counter == 14200){ + sem.release(); + } + + + } + }, null); + if(i%10==0){ + try { + Thread.sleep(100); + } catch (Exception e) { + + } + } + } + + } + }); + + + for(int i = 0; i < 13000; i++) { + zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + counter++; + if (rc != 0) { + errors++; + } + if(counter == 14200){ + sem.release(); + } + + + } + }, null); + + if(i == 5000){ + qu.shutdown(index); + LOG.info("Shutting down s1"); + } + if(i == 12000){ + //Restart off of snap, then get some txns for a log, then shut down + qu.restart(index); + Thread.sleep(300); + qu.shutdown(index); + t.start(); + Thread.sleep(300); + qu.restart(index); + LOG.info("Setting up server: " + index); + } + if((i % 1000) == 0){ + Thread.sleep(1000); + } + + if(i%50 == 0) { + zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + counter++; + if (rc != 0) { + errors++; + } + if(counter == 14200){ + sem.release(); + } + + + } + }, null); + } + } + + // Wait until all updates return + if(!sem.tryAcquire(20000, TimeUnit.MILLISECONDS)) { + LOG.warn("Did not aquire semaphore fast enough"); + } + t.join(10000); + Thread.sleep(1000); + + verifyState(qu, index, leader); + + } + + /** + * This test: + * Starts up 3 ZKs. The non-leader ZKs are writing to cluster + * Shut down one of the non-leader ZKs. + * Restart after sessions have expired but <500 txns have taken place (get a diff) + * Shut down immediately after restarting, start running separate thread with other transactions + * Restart to a diff while transactions are running in leader + * + * + * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that + * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions + * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions + * would be missed + * + * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed, + * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions + * during the leader's diff forwarding. + * + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + * @throws Throwable + */ + + @Test + public void testResyncByDiffAfterFollowerCrashes () + throws IOException, InterruptedException, KeeperException, Throwable{ + final Semaphore sem = new Semaphore(0); + + QuorumUtil qu = new QuorumUtil(1); + qu.startAll(); + CountdownWatcher watcher1 = new CountdownWatcher(); + CountdownWatcher watcher2 = new CountdownWatcher(); + CountdownWatcher watcher3 = new CountdownWatcher(); + + + int index = 1; + while(qu.getPeer(index).peer.leader == null) + index++; + + Leader leader = qu.getPeer(index).peer.leader; + + assertNotNull(leader); + + /* + * Reusing the index variable to select a follower to connect to + */ + index = (index == 1) ? 2 : 1; + + ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1); + + ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000,watcher2); + final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000, watcher3); + watcher1.waitForConnected(CONNECTION_TIMEOUT); + watcher2.waitForConnected(CONNECTION_TIMEOUT); + watcher3.waitForConnected(CONNECTION_TIMEOUT); + zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + + + final AtomicBoolean runNow = new AtomicBoolean(false); + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + int inSyncCounter = 0; + while(inSyncCounter < 400) { + if(runNow.get()) { + zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + counter++; + if (rc != 0) { + errors++; + } + if(counter > 7300){ + sem.release(); + } + + + } + }, null); + + try { + Thread.sleep(10); + } catch (Exception e) { + } + inSyncCounter++; + } + else { + Thread.yield(); + } + } + + } + }); + + t.start(); + for(int i = 0; i < 5000; i++) { + zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + counter++; + if (rc != 0) { + errors++; + } + if(counter > 7300){ + sem.release(); + } + + + } + }, null); + + if(i == 1000){ + qu.shutdown(index); + Thread.sleep(1100); + LOG.info("Shutting down s1"); + + } + if(i == 1100 || i == 1150 || i == 1200) { + Thread.sleep(1000); + } + + if(i == 1200){ + qu.startThenShutdown(index); + runNow.set(true); + qu.restart(index); + LOG.info("Setting up server: " + index); + } + + + if(i>=1000 && i%2== 0) { + zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + counter++; + if (rc != 0) { + errors++; + } + if(counter > 7300){ + sem.release(); + } + + + } + }, null); + } + if(i == 1050 || i == 1100 || i == 1150) { + Thread.sleep(1000); + } + } + + // Wait until all updates return + if(!sem.tryAcquire(15000, TimeUnit.MILLISECONDS)) { + LOG.warn("Did not aquire semaphore fast enough"); + } + t.join(10000); + Thread.sleep(1000); + // Verify that server is following and has the same epoch as the leader + + verifyState(qu, index, leader); + + } + + private void verifyState(QuorumUtil qu, int index, Leader leader) { + assertTrue("Not following", qu.getPeer(index).peer.follower != null); + long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L); + long epochL = (leader.getEpoch() >> 32L); + assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + + "Current epoch: " + epochF, epochF == epochL); + int leaderIndex = (index == 1) ? 2 : 1; + Collection sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions(); + Collection sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions(); + + for(Long l : sessionsRestarted) { + assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l)); + } + assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size()); + ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase(); + ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase(); + ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase(); + for(Long l : sessionsRestarted) { + assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l)); + HashSet ephemerals = restarted.getEphemerals(l); + HashSet cleanEphemerals = clean.getEphemerals(l); + for(Object o : cleanEphemerals) { + if(!ephemerals.contains(o)) { + LOG.info("Restarted follower doesn't contain ephemeral " + o); + } + } + HashSet leadEphemerals = lead.getEphemerals(l); + for(Object o : leadEphemerals) { + if(!cleanEphemerals.contains(o)) { + LOG.info("Follower doesn't contain ephemeral from leader " + o); + } + } + assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size()); + assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size()); + } + } +} Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1062244&r1=1062243&r2=1062244&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Sat Jan 22 19:38:57 2011 @@ -61,9 +61,9 @@ public class QuorumUtil { private final Map peers = new HashMap(); - private final int N; + public final int N; - private final int ALL; + public final int ALL; private String hostPort; @@ -123,6 +123,7 @@ public class QuorumUtil { } public void startAll() throws IOException { + shutdownAll(); for (int i = 1; i <= ALL; ++i) { start(i); LOG.info("Started QuorumPeer " + i); @@ -182,7 +183,26 @@ public class QuorumUtil { ps.id, tickTime, initLimit, syncLimit); Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); + ps.peer.start(); + } + + public void restart(int id) throws IOException { + start(id); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT)); + } + + public void startThenShutdown(int id) throws IOException { + PeerStruct ps = getPeer(id); + LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort); + ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, + ps.id, tickTime, initLimit, syncLimit); + Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); + ps.peer.start(); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT)); + shutdown(id); } public void shutdownAll() {