Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 70E4810F14 for ; Thu, 26 Sep 2013 12:37:44 +0000 (UTC) Received: (qmail 21068 invoked by uid 500); 26 Sep 2013 12:37:41 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 20963 invoked by uid 500); 26 Sep 2013 12:37:37 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 20921 invoked by uid 99); 26 Sep 2013 12:37:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 12:37:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 12:37:32 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BB0992388A32; Thu, 26 Sep 2013 12:37:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1526461 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/ Date: Thu, 26 Sep 2013 12:37:12 -0000 To: commits@zookeeper.apache.org From: fpj@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130926123712.BB0992388A32@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fpj Date: Thu Sep 26 12:37:12 2013 New Revision: 1526461 URL: http://svn.apache.org/r1526461 Log: ZOOKEEPER-87. Follower does not shut itself down if its too far behind the leader. (German Blanco via fpj) Modified: zookeeper/branches/branch-3.4/CHANGES.txt zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Modified: zookeeper/branches/branch-3.4/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1526461&r1=1526460&r2=1526461&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/CHANGES.txt (original) +++ zookeeper/branches/branch-3.4/CHANGES.txt Thu Sep 26 12:37:12 2013 @@ -112,6 +112,8 @@ BUGFIXES: ZOOKEEPER-1696. Fail to run zookeeper client on Weblogic application server. (Jeffrey Zhong via mahadev). + ZOOKEEPER-87. Follower does not shut itself down if its too far behind the leader. (German Blanco via fpj) + IMPROVEMENTS: ZOOKEEPER-1564. Allow JUnit test build with IBM Java Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1526461&r1=1526460&r2=1526461&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original) +++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Sep 26 12:37:12 2013 @@ -91,6 +91,63 @@ public class LearnerHandler extends Thre final LinkedBlockingQueue queuedPackets = new LinkedBlockingQueue(); + /** + * This class controls the time that the Leader has been + * waiting for acknowledgement of a proposal from this Learner. + * If the time is above syncLimit, the connection will be closed. + * It keeps track of only one proposal at a time, when the ACK for + * that proposal arrives, it switches to the last proposal received + * or clears the value if there is no pending proposal. + */ + private class SyncLimitCheck { + private boolean started = false; + private long currentZxid = 0; + private long currentTime = 0; + private long nextZxid = 0; + private long nextTime = 0; + + public synchronized void start() { + started = true; + } + + public synchronized void updateProposal(long zxid, long time) { + if (!started) { + return; + } + if (currentTime == 0) { + currentTime = time; + currentZxid = zxid; + } else { + nextTime = time; + nextZxid = zxid; + } + } + + public synchronized void updateAck(long zxid) { + if (currentZxid == zxid) { + currentTime = nextTime; + currentZxid = nextZxid; + nextTime = 0; + nextZxid = 0; + } else if (nextZxid == zxid) { + LOG.warn("ACK for " + zxid + " received before ACK for " + currentZxid + "!!!!"); + nextTime = 0; + nextZxid = 0; + } + } + + public synchronized boolean check(long time) { + if (currentTime == 0) { + return true; + } else { + long msDelay = (time - currentTime) / 1000000; + return (msDelay < (leader.self.tickTime * leader.self.syncLimit)); + } + } + }; + + private SyncLimitCheck syncLimitCheck = new SyncLimitCheck(); + private BinaryInputArchive ia; private BinaryOutputArchive oa; @@ -148,6 +205,9 @@ public class LearnerHandler extends Thre if (p.getType() == Leader.PING) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } + if (p.getType() == Leader.PROPOSAL) { + syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime()); + } if (LOG.isTraceEnabled()) { ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p); } @@ -461,6 +521,8 @@ public class LearnerHandler extends Thre } LOG.info("Received NEWLEADER-ACK message from " + getSid()); leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType()); + + syncLimitCheck.start(); // now that the ack has been processed expect the syncLimit sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit); @@ -505,6 +567,7 @@ public class LearnerHandler extends Thre LOG.debug("Received ACK from Observer " + this.sid); } } + syncLimitCheck.updateAck(qp.getZxid()); leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; case Leader.PING: @@ -614,12 +677,16 @@ public class LearnerHandler extends Thre */ public void ping() { long id; - synchronized(leader) { - id = leader.lastProposed; + if (syncLimitCheck.check(System.nanoTime())) { + synchronized(leader) { + id = leader.lastProposed; + } + QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null); + queuePacket(ping); + } else { + LOG.warn("Closing connection to peer due to transaction timeout."); + shutdown(); } - QuorumPacket ping = new QuorumPacket(Leader.PING, id, - null, null); - queuePacket(ping); } void queuePacket(QuorumPacket p) { Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1526461&r1=1526460&r2=1526461&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original) +++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Thu Sep 26 12:37:12 2013 @@ -27,6 +27,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; +import java.io.EOFException; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -67,6 +68,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Zab1_0Test { + private static final int SYNC_LIMIT = 2; + private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class); private static final class LeadThread extends Thread { @@ -833,6 +836,86 @@ public class Zab1_0Test { }); } + @Test + public void testTxnTimeout() throws Exception { + testLeaderConversation(new LeaderConversation() { + public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) + throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException { + Assert.assertEquals(0, l.self.getAcceptedEpoch()); + Assert.assertEquals(0, l.self.getCurrentEpoch()); + + LearnerInfo li = new LearnerInfo(1, 0x10000); + byte liBytes[] = new byte[20]; + ByteBufferOutputStream.record2ByteBuffer(li, + ByteBuffer.wrap(liBytes)); + QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, + liBytes, null); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.LEADERINFO, qp.getType()); + Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), + 0x10000); + Assert.assertEquals(1, l.self.getAcceptedEpoch()); + Assert.assertEquals(0, l.self.getCurrentEpoch()); + + qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.DIFF, qp.getType()); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.NEWLEADER, qp.getType()); + Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + Assert.assertEquals(1, l.self.getAcceptedEpoch()); + Assert.assertEquals(1, l.self.getCurrentEpoch()); + + qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.UPTODATE, qp.getType()); + + l.propose(createNodeRequest(l.zk.getZxid())); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.PROPOSAL, qp.getType()); + + LOG.info("Proposal sent."); + + for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) { + try { + ia.readRecord(qp, null); + LOG.info("Ping received: " + i); + qp = new QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(), null); + oa.writeRecord(qp, null); + } catch (EOFException e) { + return; + } + } + + Assert.fail("Connection hasn't been closed by leader after transaction times out."); + } + + private Request createNodeRequest(long zxid) throws IOException { + TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create); + CreateTxn ct = new CreateTxn("/foo", "data".getBytes(), null, true, 0); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, "header"); + boa.writeRecord(ct, "txn"); + baos.close(); + Request rq = new Request(null, 1, 1, ZooDefs.OpCode.create, ByteBuffer.wrap(baos.toByteArray()), null); + rq.zxid = zxid; + rq.hdr = hdr; + rq.txn = ct; + return rq; + } + }); + } + private void deserializeSnapshot(InputArchive ia) throws IOException { ZKDatabase zkdb = new ZKDatabase(null); @@ -977,7 +1060,7 @@ public class Zab1_0Test { private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException { QuorumPeer peer = new QuorumPeer(); - peer.syncLimit = 2; + peer.syncLimit = SYNC_LIMIT; peer.initLimit = 2; peer.tickTime = 2000; peer.quorumPeers = new HashMap(); Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1526461&r1=1526460&r2=1526461&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original) +++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java Thu Sep 26 12:37:12 2013 @@ -354,7 +354,7 @@ public class QuorumTest extends ZKTestCa throws IOException, InterruptedException, KeeperException{ final Semaphore sem = new Semaphore(0); - QuorumUtil qu = new QuorumUtil(2); + QuorumUtil qu = new QuorumUtil(2, 10); qu.startQuorum(); Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1526461&r1=1526460&r2=1526461&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original) +++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Thu Sep 26 12:37:12 2013 @@ -79,7 +79,7 @@ public class QuorumUtil { * @param n * number of peers in the ensemble will be 2n+1 */ - public QuorumUtil(int n) throws RuntimeException { + public QuorumUtil(int n, int syncLimit) throws RuntimeException { try { ClientBase.setupTestEnv(); JMXEnv.setUp(); @@ -88,7 +88,7 @@ public class QuorumUtil { ALL = 2 * N + 1; tickTime = 2000; initLimit = 3; - syncLimit = 3; + this.syncLimit = syncLimit; electionAlg = 3; hostPort = ""; @@ -116,6 +116,10 @@ public class QuorumUtil { } } + public QuorumUtil(int n) throws RuntimeException { + this(n, 3); + } + public PeerStruct getPeer(int id) { return peers.get(id); }