Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D3D8810E04 for ; Thu, 26 Sep 2013 12:06:22 +0000 (UTC) Received: (qmail 51007 invoked by uid 500); 26 Sep 2013 12:06:22 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 50879 invoked by uid 500); 26 Sep 2013 12:06:17 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 50854 invoked by uid 99); 26 Sep 2013 12:06:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 12:06:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 12:06:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1A16A2388831; Thu, 26 Sep 2013 12:05:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1526454 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/ Date: Thu, 26 Sep 2013 12:05:47 -0000 To: commits@zookeeper.apache.org From: fpj@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130926120548.1A16A2388831@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fpj Date: Thu Sep 26 12:05:47 2013 New Revision: 1526454 URL: http://svn.apache.org/r1526454 Log: ZOOKEEPER-87. Follower does not shut itself down if its too far behind the leader. (German Blanco via fpj) Modified: zookeeper/trunk/CHANGES.txt zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Modified: zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1526454&r1=1526453&r2=1526454&view=diff ============================================================================== --- zookeeper/trunk/CHANGES.txt (original) +++ zookeeper/trunk/CHANGES.txt Thu Sep 26 12:05:47 2013 @@ -387,11 +387,12 @@ BUGFIXES: which are used to ping RwServer (Rakesh R via fpj) ZOOKEEPER-1096. Leader communication should listen on specified IP, not wildcard address (Jared Cantwell, German Blanco via fpj) - ZOOKEEPER-1696. Fail to run zookeeper client on Weblogic application server. (Jeffrey Zhong via mahadev) + ZOOKEEPER-87. Follower does not shut itself down if its too far behind the leader. (German Blanco via fpj) + IMPROVEMENTS: ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports, Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1526454&r1=1526453&r2=1526454&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Sep 26 12:05:47 2013 @@ -92,6 +92,63 @@ public class LearnerHandler extends Thre final LinkedBlockingQueue queuedPackets = new LinkedBlockingQueue(); + /** + * This class controls the time that the Leader has been + * waiting for acknowledgement of a proposal from this Learner. + * If the time is above syncLimit, the connection will be closed. + * It keeps track of only one proposal at a time, when the ACK for + * that proposal arrives, it switches to the last proposal received + * or clears the value if there is no pending proposal. + */ + private class SyncLimitCheck { + private boolean started = false; + private long currentZxid = 0; + private long currentTime = 0; + private long nextZxid = 0; + private long nextTime = 0; + + public synchronized void start() { + started = true; + } + + public synchronized void updateProposal(long zxid, long time) { + if (!started) { + return; + } + if (currentTime == 0) { + currentTime = time; + currentZxid = zxid; + } else { + nextTime = time; + nextZxid = zxid; + } + } + + public synchronized void updateAck(long zxid) { + if (currentZxid == zxid) { + currentTime = nextTime; + currentZxid = nextZxid; + nextTime = 0; + nextZxid = 0; + } else if (nextZxid == zxid) { + LOG.warn("ACK for " + zxid + " received before ACK for " + currentZxid + "!!!!"); + nextTime = 0; + nextZxid = 0; + } + } + + public synchronized boolean check(long time) { + if (currentTime == 0) { + return true; + } else { + long msDelay = (time - currentTime) / 1000000; + return (msDelay < (leader.self.tickTime * leader.self.syncLimit)); + } + } + }; + + private SyncLimitCheck syncLimitCheck = new SyncLimitCheck(); + private BinaryInputArchive ia; private BinaryOutputArchive oa; @@ -171,6 +228,9 @@ public class LearnerHandler extends Thre if (p.getType() == Leader.PING) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } + if (p.getType() == Leader.PROPOSAL) { + syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime()); + } if (LOG.isTraceEnabled()) { ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p); } @@ -396,6 +456,8 @@ public class LearnerHandler extends Thre LOG.debug("Received NEWLEADER-ACK message from " + sid); } leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType()); + + syncLimitCheck.start(); // now that the ack has been processed expect the syncLimit sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit); @@ -441,6 +503,7 @@ public class LearnerHandler extends Thre LOG.debug("Received ACK from Observer " + this.sid); } } + syncLimitCheck.updateAck(qp.getZxid()); leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; case Leader.PING: @@ -822,12 +885,16 @@ public class LearnerHandler extends Thre return; } long id; - synchronized(leader) { - id = leader.lastProposed; + if (syncLimitCheck.check(System.nanoTime())) { + synchronized(leader) { + id = leader.lastProposed; + } + QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null); + queuePacket(ping); + } else { + LOG.warn("Closing connection to peer due to transaction timeout."); + shutdown(); } - QuorumPacket ping = new QuorumPacket(Leader.PING, id, - null, null); - queuePacket(ping); } /** Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1526454&r1=1526453&r2=1526454&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Thu Sep 26 12:05:47 2013 @@ -27,6 +27,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; +import java.io.EOFException; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -67,6 +68,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Zab1_0Test { + private static final int SYNC_LIMIT = 2; + private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class); private static final class LeadThread extends Thread { @@ -837,6 +840,73 @@ public class Zab1_0Test { }); } + @Test + public void testTxnTimeout() throws Exception { + testLeaderConversation(new LeaderConversation() { + public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) + throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException { + Assert.assertEquals(0, l.self.getAcceptedEpoch()); + Assert.assertEquals(0, l.self.getCurrentEpoch()); + + LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + byte liBytes[] = new byte[20]; + ByteBufferOutputStream.record2ByteBuffer(li, + ByteBuffer.wrap(liBytes)); + QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, + liBytes, null); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.LEADERINFO, qp.getType()); + Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), + 0x10000); + Assert.assertEquals(1, l.self.getAcceptedEpoch()); + Assert.assertEquals(0, l.self.getCurrentEpoch()); + + qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.DIFF, qp.getType()); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.NEWLEADER, qp.getType()); + Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + Assert.assertEquals(1, l.self.getAcceptedEpoch()); + Assert.assertEquals(1, l.self.getCurrentEpoch()); + + qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.UPTODATE, qp.getType()); + + long zxid = l.zk.getZxid(); + l.propose(new Request(1, 1, ZooDefs.OpCode.create, + new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create), + new CreateTxn("/test", "hola".getBytes(), null, true, 0), zxid)); + + readPacketSkippingPing(ia, qp); + Assert.assertEquals(Leader.PROPOSAL, qp.getType()); + + LOG.info("Proposal sent."); + + for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) { + try { + ia.readRecord(qp, null); + LOG.info("Ping received: " + i); + qp = new QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(), null); + oa.writeRecord(qp, null); + } catch (EOFException e) { + return; + } + } + Assert.fail("Connection hasn't been closed by leader after transaction times out."); + } + }); + } + private void deserializeSnapshot(InputArchive ia) throws IOException { ZKDatabase zkdb = new ZKDatabase(null); @@ -981,7 +1051,7 @@ public class Zab1_0Test { private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException { HashMap peers = new HashMap(); QuorumPeer peer = new QuorumPeer(); - peer.syncLimit = 2; + peer.syncLimit = SYNC_LIMIT; peer.initLimit = 2; peer.tickTime = 2000; Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1526454&r1=1526453&r2=1526454&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Thu Sep 26 12:05:47 2013 @@ -352,7 +352,7 @@ public class QuorumTest extends ZKTestCa throws IOException, InterruptedException, KeeperException{ final Semaphore sem = new Semaphore(0); - QuorumUtil qu = new QuorumUtil(2); + QuorumUtil qu = new QuorumUtil(2, 10); qu.startQuorum(); int index = 1; Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1526454&r1=1526453&r2=1526454&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Thu Sep 26 12:05:47 2013 @@ -79,7 +79,7 @@ public class QuorumUtil { * @param n * number of peers in the ensemble will be 2n+1 */ - public QuorumUtil(int n) throws RuntimeException { + public QuorumUtil(int n, int syncLimit) throws RuntimeException { try { ClientBase.setupTestEnv(); JMXEnv.setUp(); @@ -88,7 +88,7 @@ public class QuorumUtil { ALL = 2 * N + 1; tickTime = 2000; initLimit = 3; - syncLimit = 3; + this.syncLimit = syncLimit; electionAlg = 3; hostPort = ""; @@ -118,6 +118,10 @@ public class QuorumUtil { } } + public QuorumUtil(int n) throws RuntimeException { + this(n, 3); + } + public PeerStruct getPeer(int id) { return peers.get(id); }