zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r1526461 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Thu, 26 Sep 2013 12:37:12 GMT
Author: fpj
Date: Thu Sep 26 12:37:12 2013
New Revision: 1526461

URL: http://svn.apache.org/r1526461
Log:
ZOOKEEPER-87. Follower does not shut itself down if its too far behind the leader. (German
Blanco via fpj)


Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1526461&r1=1526460&r2=1526461&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Thu Sep 26 12:37:12 2013
@@ -112,6 +112,8 @@ BUGFIXES:
   ZOOKEEPER-1696. Fail to run zookeeper client on Weblogic application server.
   (Jeffrey Zhong via mahadev).
 
+  ZOOKEEPER-87. Follower does not shut itself down if its too far behind the leader. (German
Blanco via fpj)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1564. Allow JUnit test build with IBM Java

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1526461&r1=1526460&r2=1526461&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
Thu Sep 26 12:37:12 2013
@@ -91,6 +91,63 @@ public class LearnerHandler extends Thre
     final LinkedBlockingQueue<QuorumPacket> queuedPackets =
         new LinkedBlockingQueue<QuorumPacket>();
 
+    /**
+     * This class controls the time that the Leader has been
+     * waiting for acknowledgement of a proposal from this Learner.
+     * If the time is above syncLimit, the connection will be closed.
+     * It keeps track of only one proposal at a time, when the ACK for
+     * that proposal arrives, it switches to the last proposal received
+     * or clears the value if there is no pending proposal.
+     */
+    private class SyncLimitCheck {
+        private boolean started = false;
+        private long currentZxid = 0;
+        private long currentTime = 0;
+        private long nextZxid = 0;
+        private long nextTime = 0;
+
+        public synchronized void start() {
+            started = true;
+        }
+
+        public synchronized void updateProposal(long zxid, long time) {
+            if (!started) {
+                return;
+            }
+            if (currentTime == 0) {
+                currentTime = time;
+                currentZxid = zxid;
+            } else {
+                nextTime = time;
+                nextZxid = zxid;
+            }
+        }
+
+        public synchronized void updateAck(long zxid) {
+             if (currentZxid == zxid) {
+                 currentTime = nextTime;
+                 currentZxid = nextZxid;
+                 nextTime = 0;
+                 nextZxid = 0;
+             } else if (nextZxid == zxid) {
+                 LOG.warn("ACK for " + zxid + " received before ACK for " + currentZxid +
"!!!!");
+                 nextTime = 0;
+                 nextZxid = 0;
+             }
+        }
+
+        public synchronized boolean check(long time) {
+            if (currentTime == 0) {
+                return true;
+            } else {
+                long msDelay = (time - currentTime) / 1000000;
+                return (msDelay < (leader.self.tickTime * leader.self.syncLimit));
+            }
+        }
+    };
+
+    private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
+
     private BinaryInputArchive ia;
 
     private BinaryOutputArchive oa;
@@ -148,6 +205,9 @@ public class LearnerHandler extends Thre
                 if (p.getType() == Leader.PING) {
                     traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                 }
+                if (p.getType() == Leader.PROPOSAL) {
+                    syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
+                }
                 if (LOG.isTraceEnabled()) {
                     ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                 }
@@ -461,6 +521,8 @@ public class LearnerHandler extends Thre
             }
             LOG.info("Received NEWLEADER-ACK message from " + getSid());
             leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
+
+            syncLimitCheck.start();
             
             // now that the ack has been processed expect the syncLimit
             sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
@@ -505,6 +567,7 @@ public class LearnerHandler extends Thre
                             LOG.debug("Received ACK from Observer  " + this.sid);
                         }
                     }
+                    syncLimitCheck.updateAck(qp.getZxid());
                     leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                     break;
                 case Leader.PING:
@@ -614,12 +677,16 @@ public class LearnerHandler extends Thre
      */
     public void ping() {
         long id;
-        synchronized(leader) {
-            id = leader.lastProposed;
+        if (syncLimitCheck.check(System.nanoTime())) {
+            synchronized(leader) {
+                id = leader.lastProposed;
+            }
+            QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
+            queuePacket(ping);
+        } else {
+            LOG.warn("Closing connection to peer due to transaction timeout.");
+            shutdown();
         }
-        QuorumPacket ping = new QuorumPacket(Leader.PING, id,
-                null, null);
-        queuePacket(ping);
     }
 
     void queuePacket(QuorumPacket p) {

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1526461&r1=1526460&r2=1526461&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Thu Sep 26 12:37:12 2013
@@ -27,6 +27,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.EOFException;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -67,6 +68,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Zab1_0Test {
+    private static final int SYNC_LIMIT = 2;
+
     private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class);
 
     private static final class LeadThread extends Thread {
@@ -833,6 +836,86 @@ public class Zab1_0Test {
         });
     }
 
+    @Test
+    public void testTxnTimeout() throws Exception {
+        testLeaderConversation(new LeaderConversation() {
+            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
+                    throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException
{
+                Assert.assertEquals(0, l.self.getAcceptedEpoch());
+                Assert.assertEquals(0, l.self.getCurrentEpoch());
+                
+                LearnerInfo li = new LearnerInfo(1, 0x10000);
+                byte liBytes[] = new byte[20];
+                ByteBufferOutputStream.record2ByteBuffer(li,
+                        ByteBuffer.wrap(liBytes));
+                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
+                        liBytes, null);
+                oa.writeRecord(qp, null);
+                
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+                        0x10000);
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(0, l.self.getCurrentEpoch());
+                
+                qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
+                oa.writeRecord(qp, null);
+                
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.DIFF, qp.getType());
+
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.NEWLEADER, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(1, l.self.getCurrentEpoch());
+                
+                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
+                oa.writeRecord(qp, null);
+
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.UPTODATE, qp.getType());
+
+                l.propose(createNodeRequest(l.zk.getZxid()));
+
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.PROPOSAL, qp.getType());
+
+                LOG.info("Proposal sent.");
+
+                for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) {
+                    try {
+                        ia.readRecord(qp, null);
+                        LOG.info("Ping received: " + i);
+                        qp = new  QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(),
null);
+                        oa.writeRecord(qp, null);
+                    } catch (EOFException e) {
+                        return;
+                    }
+                }
+
+                Assert.fail("Connection hasn't been closed by leader after transaction times
out.");
+            }
+
+            private Request createNodeRequest(long zxid) throws IOException {
+                TxnHeader hdr = new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create);
+                CreateTxn ct = new CreateTxn("/foo", "data".getBytes(), null, true, 0);
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                boa.writeRecord(hdr, "header");
+                boa.writeRecord(ct, "txn");
+                baos.close();
+                Request rq = new Request(null, 1, 1, ZooDefs.OpCode.create, ByteBuffer.wrap(baos.toByteArray()),
null);
+                rq.zxid = zxid;
+                rq.hdr = hdr;
+                rq.txn = ct;
+                return rq;
+            }
+        });
+    }
+
     private void deserializeSnapshot(InputArchive ia)
             throws IOException {
         ZKDatabase zkdb = new ZKDatabase(null);
@@ -977,7 +1060,7 @@ public class Zab1_0Test {
     private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
             FileNotFoundException {
         QuorumPeer peer = new QuorumPeer();
-        peer.syncLimit = 2;
+        peer.syncLimit = SYNC_LIMIT;
         peer.initLimit = 2;
         peer.tickTime = 2000;
         peer.quorumPeers = new HashMap<Long, QuorumServer>();

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1526461&r1=1526460&r2=1526461&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Thu Sep 26 12:37:12 2013
@@ -354,7 +354,7 @@ public class QuorumTest extends ZKTestCa
     throws IOException, InterruptedException, KeeperException{
         final Semaphore sem = new Semaphore(0);
                 
-        QuorumUtil qu = new QuorumUtil(2);
+        QuorumUtil qu = new QuorumUtil(2, 10);
         qu.startQuorum();
                 
         

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1526461&r1=1526460&r2=1526461&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
Thu Sep 26 12:37:12 2013
@@ -79,7 +79,7 @@ public class QuorumUtil {
      * @param n
      *            number of peers in the ensemble will be 2n+1
      */
-    public QuorumUtil(int n) throws RuntimeException {
+    public QuorumUtil(int n, int syncLimit) throws RuntimeException {
         try {
             ClientBase.setupTestEnv();
             JMXEnv.setUp();
@@ -88,7 +88,7 @@ public class QuorumUtil {
             ALL = 2 * N + 1;
             tickTime = 2000;
             initLimit = 3;
-            syncLimit = 3;
+            this.syncLimit = syncLimit;
             electionAlg = 3;
             hostPort = "";
 
@@ -116,6 +116,10 @@ public class QuorumUtil {
         }
     }
 
+    public QuorumUtil(int n) throws RuntimeException {
+        this(n, 3);
+    }
+
     public PeerStruct getPeer(int id) {
         return peers.get(id);
     }



Mime
View raw message