zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cami...@apache.org
Subject svn commit: r1357711 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/Leader.java src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Date Thu, 05 Jul 2012 16:01:27 GMT
Author: camille
Date: Thu Jul  5 16:01:26 2012
New Revision: 1357711

URL: http://svn.apache.org/viewvc?rev=1357711&view=rev
Log:
ZOOKEEPER-1465. Cluster availability following new leader election 
    takes a long time with large datasets - is correlated to dataset size
    (fpj and Thawan Kooburat via camille)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1357711&r1=1357710&r2=1357711&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Jul  5 16:01:26 2012
@@ -202,6 +202,10 @@ BUGFIXES:
 
   ZOOKEEPER-1471. Jute generates invalid C++ code
     (Michi Mutsuzaki via phunt)
+    
+  ZOOKEEPER-1465. Cluster availability following new leader election 
+    takes a long time with large datasets - is correlated to dataset size
+    (fpj and Thawan Kooburat via camille)
 
 IMPROVEMENTS:
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1357711&r1=1357710&r2=1357711&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jul 
5 16:01:26 2012
@@ -881,7 +881,11 @@ public class Leader {
             }
             if (ss.getCurrentEpoch() != -1) {
                 if (ss.isMoreRecentThan(leaderStateSummary)) {
-                    throw new IOException("Follower is ahead of the leader");
+                    throw new IOException("Follower is ahead of the leader, leader summary:
" 
+                                                    + leaderStateSummary.getCurrentEpoch()
+                                                    + " (current epoch), "
+                                                    + leaderStateSummary.getLastZxid()
+                                                    + " (last zxid)");
                 }
                 electingFollowers.add(id);
             }
@@ -903,4 +907,50 @@ public class Leader {
             }
         }
     }
+
+    /**
+     * Get string representation of a given packet type
+     * @param packetType
+     * @return string representing the packet type
+     */
+    public static String getPacketType(int packetType) {
+        switch (packetType) {
+        case DIFF:
+            return "DIFF";
+        case TRUNC:
+            return "TRUNC";
+        case SNAP:
+            return "SNAP";
+        case OBSERVERINFO:
+            return "OBSERVERINFO";
+        case NEWLEADER:
+            return "NEWLEADER";
+        case FOLLOWERINFO:
+            return "FOLLOWERINFO";
+        case UPTODATE:
+            return "UPTODATE";
+        case LEADERINFO:
+            return "LEADERINFO";
+        case ACKEPOCH:
+            return "ACKEPOCH";
+        case REQUEST:
+            return "REQUEST";
+        case PROPOSAL:
+            return "PROPOSAL";
+        case ACK:
+            return "ACK";
+        case COMMIT:
+            return "COMMIT";
+        case PING:
+            return "PING";
+        case REVALIDATE:
+            return "REVALIDATE";
+        case SYNC:
+            return "SYNC";
+        case INFORM:
+            return "INFORM";
+        default:
+            return "UNKNOWN";
+        }
+    }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1357711&r1=1357710&r2=1357711&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu
Jul  5 16:01:26 2012
@@ -342,6 +342,12 @@ public class LearnerHandler extends Thre
                         // whether to expect a trunc or a diff
                         boolean firstPacket=true;
 
+                        // If we are here, we can use committedLog to sync with
+                        // follower. Then we only need to decide whether to
+                        // send trunc or not
+                        packetToSend = Leader.DIFF;
+                        zxidToSend = maxCommittedLog;
+
                         for (Proposal propose: proposals) {
                             // skip the proposals the peer already has
                             if (propose.packet.getZxid() <= peerLastZxid) {
@@ -356,17 +362,9 @@ public class LearnerHandler extends Thre
                                     if (prevProposalZxid < peerLastZxid) {
                                         // send a trunc message before sending the diff
                                         packetToSend = Leader.TRUNC;
-                                        LOG.info("Sending TRUNC");
                                         zxidToSend = prevProposalZxid;
                                         updates = zxidToSend;
                                     }
-                                    else {
-                                        // Just send the diff
-                                        packetToSend = Leader.DIFF;
-                                        LOG.info("Sending diff");
-                                        zxidToSend = maxCommittedLog;
-                                    }
-
                                 }
                                 queuePacket(propose.packet);
                                 QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
@@ -385,19 +383,23 @@ public class LearnerHandler extends Thre
                     } else {
                         LOG.warn("Unhandled proposal scenario");
                     }
+                } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid())
{
+                    // The leader may recently take a snapshot, so the committedLog
+                    // is empty. We don't need to send snapshot if the follow
+                    // is already sync with in-memory db.
+                    LOG.debug("committedLog is empty but leader and follower "
+                            + "are in sync, zxid=0x{}",
+                            Long.toHexString(peerLastZxid));
+                    packetToSend = Leader.DIFF;
+                    zxidToSend = peerLastZxid;
                 } else {
                     // just let the state transfer happen
                     LOG.debug("proposals is empty");
                 }               
 
+                LOG.info("Sending " + Leader.getPacketType(packetToSend));
                 leaderLastZxid = leader.startForwarding(this, updates);
-                if (peerLastZxid == leaderLastZxid) {
-                    LOG.debug("Leader and follower are in sync, sending empty diff. zxid=0x{}",
-                            Long.toHexString(leaderLastZxid));
-                    // We are in sync so we'll do an empty diff
-                    packetToSend = Leader.DIFF;
-                    zxidToSend = leaderLastZxid;
-                }
+
             } finally {
                 rl.unlock();
             }

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1357711&r1=1357710&r2=1357711&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Thu Jul
 5 16:01:26 2012
@@ -49,6 +49,7 @@ import org.apache.zookeeper.server.ZKDat
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.util.ZxidUtils;
@@ -307,6 +308,10 @@ public class Zab1_0Test {
         void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws Exception;
     }
     
+    static public interface PopulatedLeaderConversation {
+        void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws
Exception;
+    }
+    
     static public interface FollowerConversation {
         void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
     }
@@ -353,6 +358,76 @@ public class Zab1_0Test {
         }
     }
     
+    public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversation,
int ops) throws Exception {
+        Socket pair[] = getSocketPair();
+        Socket leaderSocket = pair[0];
+        Socket followerSocket = pair[1];
+        File tmpDir = File.createTempFile("test", "dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        LeadThread leadThread = null;
+        Leader leader = null;
+        try {              
+            // Setup a database with two znodes
+            FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir);
+            ZKDatabase zkDb = new ZKDatabase(snapLog);
+            
+            Assert.assertTrue(ops >= 1);
+            long zxid = ZxidUtils.makeZxid(1, 0);            
+            for(int i = 1; i <= ops; i++){
+                zxid = ZxidUtils.makeZxid(1, i);
+                String path = "/foo-"+ i;
+                zkDb.processTxn(new TxnHeader(13,1000+i,zxid,30+i,ZooDefs.OpCode.create),

+                                                new CreateTxn(path, "fpjwasalsohere".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+                Stat stat = new Stat();
+                Assert.assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat,
null)));
+            }                
+            Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
+            
+            // Generate snapshot and close files.
+            snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+            snapLog.close();
+            
+            QuorumPeer peer = createQuorumPeer(tmpDir);
+                        
+            leader = createLeader(tmpDir, peer);
+            peer.leader = leader;
+            
+            // Set the last accepted epoch and current epochs to be 1
+            peer.setAcceptedEpoch(1);
+            peer.setCurrentEpoch(1);
+
+            
+            leadThread = new LeadThread(leader);
+            leadThread.start();
+
+            while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) {
+                Thread.sleep(20);
+            }
+            
+            LearnerHandler lh = new LearnerHandler(leaderSocket, leader);
+            lh.start();
+            leaderSocket.setSoTimeout(4000);
+
+            InputArchive ia = BinaryInputArchive.getArchive(followerSocket
+                    .getInputStream());
+            OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket
+                    .getOutputStream());
+
+            conversation.converseWithLeader(ia, oa, leader, zxid);
+        } finally {
+            recursiveDelete(tmpDir);
+            if (leader != null) {
+                leader.shutdown("end of test");
+            }
+            if (leadThread != null) {
+                leadThread.interrupt();
+                leadThread.join();
+            }
+        }
+    }
+    
+    
     public void testFollowerConversation(FollowerConversation conversation) throws Exception
{
         File tmpDir = File.createTempFile("test", "dir");
         tmpDir.delete();
@@ -404,6 +479,46 @@ public class Zab1_0Test {
     }
 
     @Test
+    public void testUnnecessarySnap() throws Exception {
+        testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
+           @Override
+           public void converseWithLeader(InputArchive ia, OutputArchive oa,
+                    Leader l, long zxid) throws Exception {
+               
+               Assert.assertEquals(1, l.self.getAcceptedEpoch());
+               Assert.assertEquals(1, l.self.getCurrentEpoch());
+               
+               /* we test a normal run. everything should work out well. */
+               LearnerInfo li = new LearnerInfo(1, 0x10000);
+               byte liBytes[] = new byte[12];
+               ByteBufferOutputStream.record2ByteBuffer(li,
+                       ByteBuffer.wrap(liBytes));
+               QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1,
+                       liBytes, null);
+               oa.writeRecord(qp, null);
+               
+               readPacketSkippingPing(ia, qp);
+               Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+               Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
+               Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+                       0x10000);
+               Assert.assertEquals(2, l.self.getAcceptedEpoch());
+               Assert.assertEquals(1, l.self.getCurrentEpoch());
+               
+               byte epochBytes[] = new byte[4];
+               final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
+               wrappedEpochBytes.putInt(1);
+               qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null);
+               oa.writeRecord(qp, null);
+               
+               readPacketSkippingPing(ia, qp);
+               Assert.assertEquals(Leader.DIFF, qp.getType());
+           
+           }
+       }, 2);
+    }
+    
+    @Test
     public void testNormalFollowerRun() throws Exception {
         testFollowerConversation(new FollowerConversation() {
             @Override
@@ -686,9 +801,8 @@ public class Zab1_0Test {
                 oa.writeRecord(qp, null);
                 
                 readPacketSkippingPing(ia, qp);
-                Assert.assertEquals(Leader.SNAP, qp.getType());
-                deserializeSnapshot(ia);
-               
+                Assert.assertEquals(Leader.DIFF, qp.getType());
+
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.NEWLEADER, qp.getType());
                 Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
@@ -743,9 +857,7 @@ public class Zab1_0Test {
                 qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
                 oa.writeRecord(qp, null);
                 readPacketSkippingPing(ia, qp);
-                Assert.assertEquals(Leader.SNAP, qp.getType());
-                deserializeSnapshot(ia);
-
+                Assert.assertEquals(Leader.DIFF, qp.getType());
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.NEWLEADER, qp.getType());
                 Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());



Mime
View raw message