hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r807486 - in /hadoop/zookeeper/branches/branch-3.2: ./ src/java/main/org/apache/zookeeper/server/persistence/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Tue, 25 Aug 2009 05:47:20 GMT
Author: mahadev
Date: Tue Aug 25 05:47:20 2009
New Revision: 807486

URL: http://svn.apache.org/viewvc?rev=807486&view=rev
Log:
ZOOKEEPER-408. proposals and commits for DIFF and Truncate messages from the leader to followers
is buggy. (mahadev and ben via mahadev)

Modified:
    hadoop/zookeeper/branches/branch-3.2/CHANGES.txt
    hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
    hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
    hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
    hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java

Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/CHANGES.txt?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.2/CHANGES.txt Tue Aug 25 05:47:20 2009
@@ -61,6 +61,9 @@
   ZOOKEEPER-498. Unending Leader Elections : WAN configuration (flavio via
   mahadev) 
 
+  ZOOKEEPER-408. proposals and commits for DIFF and Truncate messages from the 
+  leader to followers is buggy. (mahadev and ben via mahadev)
+
 IMPROVEMENTS:
 
 NEW FEATURES:

Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
Tue Aug 25 05:47:20 2009
@@ -24,6 +24,7 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
@@ -271,8 +272,8 @@
      */
     public boolean truncate(long zxid) throws IOException {
         FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid);
-        FileInputStream input = itr.inputStream;
-        long pos = input.getChannel().position();
+        PositionInputStream input = itr.inputStream;
+        long pos = input.getPosition();
         // now, truncate at the current position
         RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
         raf.setLength(pos);
@@ -322,6 +323,48 @@
     }
 
     /**
+     * a class that keeps track of the position 
+     * in the input stream. The position points to offset
+     * that has been consumed by the applications. It can 
+     * wrap buffered input streams to provide the right offset 
+     * for the application.
+     */
+    static class PositionInputStream extends FilterInputStream {
+        long position;
+        protected PositionInputStream(InputStream in) {
+            super(in);
+        }
+        
+        @Override
+        public int read() throws IOException {
+            int rc = super.read();
+            if (rc > 0) {
+                position++;
+            }
+            return rc;
+        }
+        
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            int rc = super.read(b, off, len);
+            position += rc;
+            return rc;
+        }
+        
+        @Override
+        public long skip(long n) throws IOException {
+            long rc = super.skip(n);
+            if (rc > 0) {
+                position += rc;
+            }
+            return rc;
+        }
+        public long getPosition() {
+            return position;
+        }
+    }
+    
+    /**
      * this class implements the txnlog iterator interface
      * which is used for reading the transaction logs
      */
@@ -333,7 +376,8 @@
         File logFile;
         InputArchive ia;
         static final String CRC_ERROR="CRC check failed";
-        FileInputStream inputStream=null;
+       
+        PositionInputStream inputStream=null;
         //stored files is the list of files greater than
         //the zxid we are looking for.
         private ArrayList<File> storedFiles;
@@ -398,7 +442,7 @@
          * @param is the inputstream
          * @throws IOException
          */
-        protected void inStreamCreated(InputArchive ia, FileInputStream is)
+        protected void inStreamCreated(InputArchive ia, InputStream is)
             throws IOException{
             FileHeader header= new FileHeader();
             header.deserialize(ia, "fileheader");
@@ -416,9 +460,9 @@
          **/
         protected InputArchive createInputArchive(File logFile) throws IOException {
             if(inputStream==null){
-                inputStream= new FileInputStream(logFile);
+                inputStream= new PositionInputStream(new BufferedInputStream(new FileInputStream(logFile)));
                 LOG.debug("Created new input stream " + logFile);
-                ia  = BinaryInputArchive.getArchive(new BufferedInputStream(inputStream));
+                ia  = BinaryInputArchive.getArchive(inputStream);
                 inStreamCreated(ia,inputStream);
                 LOG.debug("created new input archive " + logFile);
             }

Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
Tue Aug 25 05:47:20 2009
@@ -200,7 +200,7 @@
                 readPacket(qp);
                 synchronized (zk) {
                     if (qp.getType() == Leader.DIFF) {
-                        LOG.info("Getting a diff from the leader!");
+                        LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
                         zk.loadData();
                     }
                     else if (qp.getType() == Leader.SNAP) {

Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
Tue Aug 25 05:47:20 2009
@@ -62,7 +62,7 @@
     
     long getSid(){
         return sid;
-    }
+    }                    
 
     /**
      * The packets to be sent to the follower
@@ -224,15 +224,23 @@
             } else {
             	this.sid = leader.followerCounter.getAndDecrement();
             }
-            LOG.info("The follower sid: " + this.sid);
+            LOG.info("Follower sid: " + this.sid + " : info : "
+                    + leader.self.quorumPeers.get(this.sid));
             
+            /* this is the last zxid from the follower but the leader might have to
+              restart the follower from a different zxid depending on truncate and diff.
*/
             long peerLastZxid = qp.getZxid();
-            
+            /* the default to send to the follower */
             int packetToSend = Leader.SNAP;
             boolean logTxns = true;
-
             long zxidToSend = 0;
-            // we are sending the diff
+            
+            /** the packets that the follower needs to get updates from **/
+            long updates = peerLastZxid;
+            
+            /* we are sending the diff check if we have proposals in memory to be able to

+             * send a diff to the 
+             */ 
             synchronized(leader.zk.committedLog) {
                 if (leader.zk.committedLog.size() != 0) {
                     if ((leader.zk.maxCommittedLog >= peerLastZxid)
@@ -252,16 +260,7 @@
                 }
                 else {
                     logTxns = false;
-                }            }
-            long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
-            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                    leaderLastZxid, null, null);
-            oa.writeRecord(newLeaderQP, "packet");
-            bufferedOutput.flush();
-            // a special case when both the ids are the same
-            if (peerLastZxid == leaderLastZxid) {
-                packetToSend = Leader.DIFF;
-                zxidToSend = leaderLastZxid;
+                }            
             }
             //check if we decided to send a diff or we need to send a truncate
             // we avoid using epochs for truncating because epochs make things
@@ -274,11 +273,31 @@
                 // we can ask the follower to truncate the log
                 packetToSend = Leader.TRUNC;
                 zxidToSend = leader.zk.maxCommittedLog;
-
+                updates = zxidToSend;
             }
+            
+            /* see what other packets from the proposal
+             * and tobeapplied queues need to be sent
+             * and then decide if we can just send a DIFF
+             * or we actually need to send the whole snapshot
+             */
+            long leaderLastZxid = leader.startForwarding(this, updates);
+            // a special case when both the ids are the same 
+            if (peerLastZxid == leaderLastZxid) {
+                packetToSend = Leader.DIFF;
+                zxidToSend = leaderLastZxid;
+            }
+
+            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                    leaderLastZxid, null, null);
+            oa.writeRecord(newLeaderQP, "packet");
+            bufferedOutput.flush();
+            
+           
             oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
             bufferedOutput.flush();
-            // only if we are not truncating or fast sycning
+            
+            /* if we are not truncating or sending a diff just send a snapshot */
             if (packetToSend == Leader.SNAP) {
                 LOG.warn("Sending snapshot last zxid of peer is 0x"
                         + Long.toHexString(peerLastZxid) + " " 
@@ -289,7 +308,7 @@
                 oa.writeString("BenWasHere", "signature");
             }
             bufferedOutput.flush();
-            //
+            
             // Mutation packets will be queued during the serialize,
             // so we need to mark when the follower can actually start
             // using the data

Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
Tue Aug 25 05:47:20 2009
@@ -647,8 +647,7 @@
                 }
                 handler.queuePacket(p.packet);
                 // Since the proposal has been committed we need to send the
-                // commit message
-                // also
+                // commit message also
                 QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet
                         .getZxid(), null, null);
                 handler.queuePacket(qp);

Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
Tue Aug 25 05:47:20 2009
@@ -43,13 +43,33 @@
             try {
                 follower.writePacket(qp, false);
             } catch (IOException e) {
-                LOG.warn("Ignoring unexpected exception during packet send", e);
+                LOG.warn("Closing connection to leader, exception during packet send", e);
+                try {
+                    if (!follower.sock.isClosed()) {
+                        follower.sock.close();
+                    }
+                } catch (IOException e1) {
+                    // Nothing to do, we are shutting things down, so an exception here is
irrelevant
+                    LOG.debug("Ignoring error closing the connection", e1);
+                }
             }
         }
     }
     
     public void flush() throws IOException {
-        follower.writePacket(null, true);
+        try {
+            follower.writePacket(null, true);
+        } catch(IOException e) {
+            LOG.warn("Closing connection to leader, exception during packet send", e);
+            try {
+                if (!follower.sock.isClosed()) {
+                    follower.sock.close();
+                }
+            } catch (IOException e1) {
+                    // Nothing to do, we are shutting things down, so an exception here is
irrelevant
+                    LOG.debug("Ignoring error closing the connection", e1);
+            }
+        }
     }
 
     public void shutdown() {

Modified: hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
Tue Aug 25 05:47:20 2009
@@ -247,7 +247,7 @@
         File tmpFile = File.createTempFile("test", ".junit", parentDir);
         // don't delete tmpFile - this ensures we don't attempt to create
         // a tmpDir with a duplicate name
-        
+        tmpFile.delete();
         File tmpDir = new File(tmpFile + ".dir");
         assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
         assertTrue(tmpDir.mkdirs());
@@ -356,7 +356,7 @@
         return JMXEnv.conn();
     }
 
-    private static boolean recursiveDelete(File d) {
+    public static boolean recursiveDelete(File d) {
         if (d.isDirectory()) {
             File children[] = d.listFiles();
             for (File f : children) {

Modified: hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Tue Aug 25 05:47:20 2009
@@ -25,12 +25,16 @@
 import junit.framework.TestCase;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.FollowerHandler;
+import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,7 +55,7 @@
     protected void tearDown() throws Exception {
         qb.tearDown();
     }
-
+    
     @Test
     public void testDeleteWithChildren() throws Exception {
         ct.testDeleteWithChildren();
@@ -92,6 +96,7 @@
     {
         ct.testClientWithWatcherObj();
     }
+    
     @Test
     public void testMultipleWatcherObjs() throws IOException,
             InterruptedException, KeeperException
@@ -99,6 +104,56 @@
         ct.testMutipleWatcherObjs();
     }
     
+    volatile int counter = 0;
+    volatile int errors = 0;
+    @Test
+    public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException
{
+        ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT,
new Watcher() {
+            public void process(WatchedEvent event) {
+        }});
+        zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Leader leader = qb.s1.leader;
+        if (leader == null) leader = qb.s2.leader;
+        if (leader == null) leader = qb.s3.leader;
+        if (leader == null) leader = qb.s4.leader;
+        if (leader == null) leader = qb.s5.leader;
+        assertNotNull(leader);
+        for(int i = 0; i < 5000; i++) {
+            zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                }
+            }, null);
+        }
+        ArrayList<FollowerHandler> fhs = new ArrayList<FollowerHandler>(leader.forwardingFollowers);
+        for(FollowerHandler f: fhs) {
+            f.sock.shutdownInput();
+        }
+        for(int i = 0; i < 5000; i++) {
+            zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                }
+            }, null);
+        }
+        // check if all the followers are alive
+        assertTrue(qb.s1.isAlive());
+        assertTrue(qb.s2.isAlive());
+        assertTrue(qb.s3.isAlive());
+        assertTrue(qb.s4.isAlive());
+        assertTrue(qb.s5.isAlive());
+        zk.close();
+    }
+
     /**
      * Make sure that we can change sessions
      *  from follower to leader.
@@ -171,6 +226,5 @@
         }
         zk.close();
     }
-
-    // skip superhammer and clientcleanup as they are too expensive for quorum
+	// skip superhammer and clientcleanup as they are too expensive for quorum
 }



Mime
View raw message