zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r1526454 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Thu, 26 Sep 2013 12:05:47 GMT
Author: fpj
Date: Thu Sep 26 12:05:47 2013
New Revision: 1526454

URL: http://svn.apache.org/r1526454
Log:
ZOOKEEPER-87. Follower does not shut itself down if its too far behind the leader. (German
Blanco via fpj)


Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1526454&r1=1526453&r2=1526454&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Sep 26 12:05:47 2013
@@ -387,11 +387,12 @@ BUGFIXES:
                   which are used to ping RwServer (Rakesh R via fpj)
 
   ZOOKEEPER-1096. Leader communication should listen on specified IP, not wildcard address
(Jared Cantwell, German Blanco via fpj)
-
  
   ZOOKEEPER-1696. Fail to run zookeeper client on Weblogic application server.
   (Jeffrey Zhong via mahadev)
 
+  ZOOKEEPER-87. Follower does not shut itself down if its too far behind the leader. (German
Blanco via fpj)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1526454&r1=1526453&r2=1526454&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu
Sep 26 12:05:47 2013
@@ -92,6 +92,63 @@ public class LearnerHandler extends Thre
     final LinkedBlockingQueue<QuorumPacket> queuedPackets =
         new LinkedBlockingQueue<QuorumPacket>();
 
+    /**
+     * This class controls the time that the Leader has been
+     * waiting for acknowledgement of a proposal from this Learner.
+     * If the time is above syncLimit, the connection will be closed.
+     * It keeps track of only one proposal at a time, when the ACK for
+     * that proposal arrives, it switches to the last proposal received
+     * or clears the value if there is no pending proposal.
+     */
+    private class SyncLimitCheck {
+        private boolean started = false;
+        private long currentZxid = 0;
+        private long currentTime = 0;
+        private long nextZxid = 0;
+        private long nextTime = 0;
+
+        public synchronized void start() {
+            started = true;
+        }
+
+        public synchronized void updateProposal(long zxid, long time) {
+            if (!started) {
+                return;
+            }
+            if (currentTime == 0) {
+                currentTime = time;
+                currentZxid = zxid;
+            } else {
+                nextTime = time;
+                nextZxid = zxid;
+            }
+        }
+
+        public synchronized void updateAck(long zxid) {
+             if (currentZxid == zxid) {
+                 currentTime = nextTime;
+                 currentZxid = nextZxid;
+                 nextTime = 0;
+                 nextZxid = 0;
+             } else if (nextZxid == zxid) {
+                 LOG.warn("ACK for " + zxid + " received before ACK for " + currentZxid +
"!!!!");
+                 nextTime = 0;
+                 nextZxid = 0;
+             }
+        }
+
+        public synchronized boolean check(long time) {
+            if (currentTime == 0) {
+                return true;
+            } else {
+                long msDelay = (time - currentTime) / 1000000;
+                return (msDelay < (leader.self.tickTime * leader.self.syncLimit));
+            }
+        }
+    };
+
+    private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
+
     private BinaryInputArchive ia;
 
     private BinaryOutputArchive oa;
@@ -171,6 +228,9 @@ public class LearnerHandler extends Thre
                 if (p.getType() == Leader.PING) {
                     traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                 }
+                if (p.getType() == Leader.PROPOSAL) {
+                    syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
+                }
                 if (LOG.isTraceEnabled()) {
                     ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                 }
@@ -396,6 +456,8 @@ public class LearnerHandler extends Thre
             	LOG.debug("Received NEWLEADER-ACK message from " + sid);   
             }
             leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
+
+            syncLimitCheck.start();
             
             // now that the ack has been processed expect the syncLimit
             sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
@@ -441,6 +503,7 @@ public class LearnerHandler extends Thre
                             LOG.debug("Received ACK from Observer  " + this.sid);
                         }
                     }
+                    syncLimitCheck.updateAck(qp.getZxid());
                     leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                     break;
                 case Leader.PING:
@@ -822,12 +885,16 @@ public class LearnerHandler extends Thre
             return;
         }
         long id;
-        synchronized(leader) {
-            id = leader.lastProposed;
+        if (syncLimitCheck.check(System.nanoTime())) {
+            synchronized(leader) {
+                id = leader.lastProposed;
+            }
+            QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
+            queuePacket(ping);
+        } else {
+            LOG.warn("Closing connection to peer due to transaction timeout.");
+            shutdown();
         }
-        QuorumPacket ping = new QuorumPacket(Leader.PING, id,
-                null, null);
-        queuePacket(ping);
     }
 
     /**

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1526454&r1=1526453&r2=1526454&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Thu Sep
26 12:05:47 2013
@@ -27,6 +27,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.EOFException;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -67,6 +68,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Zab1_0Test {
+    private static final int SYNC_LIMIT = 2;
+
     private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class);
 
     private static final class LeadThread extends Thread {
@@ -837,6 +840,73 @@ public class Zab1_0Test {
         });
     }
 
+    @Test
+    public void testTxnTimeout() throws Exception {
+        testLeaderConversation(new LeaderConversation() {
+            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
+                    throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException
{
+                Assert.assertEquals(0, l.self.getAcceptedEpoch());
+                Assert.assertEquals(0, l.self.getCurrentEpoch());
+                
+                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+                byte liBytes[] = new byte[20];
+                ByteBufferOutputStream.record2ByteBuffer(li,
+                        ByteBuffer.wrap(liBytes));
+                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
+                        liBytes, null);
+                oa.writeRecord(qp, null);
+                
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+                        0x10000);
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(0, l.self.getCurrentEpoch());
+                
+                qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
+                oa.writeRecord(qp, null);
+                
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.DIFF, qp.getType());
+
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.NEWLEADER, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(1, l.self.getCurrentEpoch());
+                
+                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
+                oa.writeRecord(qp, null);
+
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.UPTODATE, qp.getType());
+
+                long zxid = l.zk.getZxid();
+                l.propose(new Request(1, 1, ZooDefs.OpCode.create,
+                            new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create),
+                            new CreateTxn("/test", "hola".getBytes(), null, true, 0), zxid));
+
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.PROPOSAL, qp.getType());
+
+                LOG.info("Proposal sent.");
+
+                for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) {
+                    try {
+                        ia.readRecord(qp, null);
+                        LOG.info("Ping received: " + i);
+                        qp = new  QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(),
null);
+                        oa.writeRecord(qp, null);
+                    } catch (EOFException e) {
+                        return;
+                    }
+                }
+                Assert.fail("Connection hasn't been closed by leader after transaction times
out.");
+            }
+        });
+    }
+
     private void deserializeSnapshot(InputArchive ia)
             throws IOException {
         ZKDatabase zkdb = new ZKDatabase(null);
@@ -981,7 +1051,7 @@ public class Zab1_0Test {
     private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException
{
         HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
         QuorumPeer peer = new QuorumPeer();
-        peer.syncLimit = 2;
+        peer.syncLimit = SYNC_LIMIT;
         peer.initLimit = 2;
         peer.tickTime = 2000;
         

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1526454&r1=1526453&r2=1526454&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Thu Sep 26 12:05:47
2013
@@ -352,7 +352,7 @@ public class QuorumTest extends ZKTestCa
     throws IOException, InterruptedException, KeeperException{
         final Semaphore sem = new Semaphore(0);
 
-        QuorumUtil qu = new QuorumUtil(2);
+        QuorumUtil qu = new QuorumUtil(2, 10);
         qu.startQuorum();
 
         int index = 1;

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1526454&r1=1526453&r2=1526454&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Thu Sep 26 12:05:47
2013
@@ -79,7 +79,7 @@ public class QuorumUtil {
      * @param n
      *            number of peers in the ensemble will be 2n+1
      */
-    public QuorumUtil(int n) throws RuntimeException {
+    public QuorumUtil(int n, int syncLimit) throws RuntimeException {
         try {
             ClientBase.setupTestEnv();
             JMXEnv.setUp();
@@ -88,7 +88,7 @@ public class QuorumUtil {
             ALL = 2 * N + 1;
             tickTime = 2000;
             initLimit = 3;
-            syncLimit = 3;
+            this.syncLimit = syncLimit;
             electionAlg = 3;
             hostPort = "";
 
@@ -118,6 +118,10 @@ public class QuorumUtil {
         }
     }
 
+    public QuorumUtil(int n) throws RuntimeException {
+        this(n, 3);
+    }
+
     public PeerStruct getPeer(int id) {
         return peers.get(id);
     }



Mime
View raw message