hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r831371 [1/2] - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/config/ src/java/test/org/apache/zookeeper/test/
Date Fri, 30 Oct 2009 16:19:14 GMT
Author: mahadev
Date: Fri Oct 30 16:19:13 2009
New Revision: 831371

URL: http://svn.apache.org/viewvc?rev=831371&view=rev
Log:
ZOOKEEPER-549. Refactor Followers and related classes into a Peer->Follower hierarchy in preparation for Observers (henry robinson via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
Removed:
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSessionTracker.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderBean.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Oct 30 16:19:13 2009
@@ -115,6 +115,9 @@
   ZOOKEEPER-530. Memory corruption: Zookeeper c client IPv6 implementation
   does not honor struct sockaddr_in6 size (isabel drost via mahadev)
 
+  ZOOKEEPER-549. Refactor Followers and related classes into a Peer->Follower
+  hierarchy in preparation for Observers (henry robinson via mahadev)
+
 NEW FEATURES:
   ZOOKEEPER-539. generate eclipse project via ant target. (phunt via mahadev)
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Fri Oct 30 16:19:13 2009
@@ -18,52 +18,28 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
-import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This class has the control logic for the Follower.
  */
-public class Follower {
-    private static final Logger LOG = Logger.getLogger(Follower.class);
-
-    static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
-    static {
-        LOG.info("TCP NoDelay set to: " + nodelay);
-    }
-
-    QuorumPeer self;
-
-    FollowerZooKeeperServer zk;
+public class Follower extends Learner{
 
+    private long lastQueued;
+    // This is the same object as this.zk, but we cache the downcast op
+    FollowerZooKeeperServer fzk = null;
+    
     Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
         this.self = self;
         this.zk=zk;
+        this.fzk = zk;
     }
 
     @Override
@@ -76,235 +52,24 @@
         return sb.toString();
     }
 
-    private InputArchive leaderIs;
-
-    private OutputArchive leaderOs;
-
-    private BufferedOutputStream bufferedOutput;
-
-    public Socket sock;
-
-    /**
-     * write a packet to the leader
-     *
-     * @param pp
-     *                the proposal packet to be sent to the leader
-     * @throws IOException
-     */
-    void writePacket(QuorumPacket pp, boolean flush) throws IOException {
-        synchronized (leaderOs) {
-            if (pp != null) {
-                leaderOs.writeRecord(pp, "packet");
-            }
-            if (flush) {
-                bufferedOutput.flush();
-            }
-        }
-    }
-
-    /**
-     * read a packet from the leader
-     *
-     * @param pp
-     *                the packet to be instantiated
-     * @throws IOException
-     */
-    void readPacket(QuorumPacket pp) throws IOException {
-        synchronized (leaderIs) {
-            leaderIs.readRecord(pp, "packet");
-        }
-        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
-        if (pp.getType() == Leader.PING) {
-            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-        }
-        if (LOG.isTraceEnabled()) {
-            ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
-        }
-    }
-
     /**
      * the main method called by the follower to follow the leader
      *
      * @throws InterruptedException
      */
     void followLeader() throws InterruptedException {
-        zk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
-
-        try {
-            InetSocketAddress addr = null;
-            // Find the leader by id
-            Vote current = self.getCurrentVote();
-            for (QuorumServer s : self.quorumPeers.values()) {
-                if (s.id == current.id) {
-                    addr = s.addr;
-                    break;
-                }
-            }
-            if (addr == null) {
-                LOG.warn("Couldn't find the leader with id = "
-                        + current.id);
-            }
-            LOG.info("Following " + addr);
-            sock = new Socket();
+        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
+        try {            
+            InetSocketAddress addr = findLeader();            
             try {
-                QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
-                sock.setSoTimeout(self.tickTime * self.initLimit);
-                for (int tries = 0; tries < 5; tries++) {
-                    try {
-                        //sock = new Socket();
-                        //sock.setSoTimeout(self.tickTime * self.initLimit);
-                        sock.connect(addr, self.tickTime * self.syncLimit);
-                        sock.setTcpNoDelay(nodelay);
-                        break;
-                    } catch (IOException e) {
-                        if (tries == 4) {
-                            LOG.error("Unexpected exception",e);
-                            throw e;
-                        } else {
-                            LOG.warn("Unexpected exception, tries="+tries,e);
-                            sock = new Socket();
-                            sock.setSoTimeout(self.tickTime * self.initLimit);
-                        }
-                    }
-                    Thread.sleep(1000);
-                }
-                leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
-                        sock.getInputStream()));
-                bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
-                leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
-                
-                /*
-                 * Send follower info, including last zxid and sid
-                 */
+                connectToLeader(addr);
+                long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO);
+                syncWithLeader(newLeaderZxid);                
                 QuorumPacket qp = new QuorumPacket();
-                qp.setType(Leader.FOLLOWERINFO);
-                long sentLastZxid = self.getLastLoggedZxid();
-                qp.setZxid(sentLastZxid);
-                
-                /*
-                 * Add sid to payload
-                 */
-                ByteArrayOutputStream bsid = new ByteArrayOutputStream();
-                DataOutputStream dsid = new DataOutputStream(bsid);
-                dsid.writeLong(self.getId());
-                qp.setData(bsid.toByteArray());
-                
-                writePacket(qp, true);
-                readPacket(qp);
-                long newLeaderZxid = qp.getZxid();
-    
-                if (qp.getType() != Leader.NEWLEADER) {
-                    LOG.error("First packet should have been NEWLEADER");
-                    throw new IOException("First packet should have been NEWLEADER");
-                }
-                readPacket(qp);
-                synchronized (zk) {
-                    if (qp.getType() == Leader.DIFF) {
-                        LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
-                        zk.loadData();
-                    }
-                    else if (qp.getType() == Leader.SNAP) {
-                        LOG.info("Getting a snapshot from leader");
-                        // The leader is going to dump the database
-                        zk.deserializeSnapshot(leaderIs);
-                        String signature = leaderIs.readString("signature");
-                        if (!signature.equals("BenWasHere")) {
-                            LOG.error("Missing signature. Got " + signature);
-                            throw new IOException("Missing signature");
-                        }
-                    } else if (qp.getType() == Leader.TRUNC) {
-                        //we need to truncate the log to the lastzxid of the leader
-                        LOG.warn("Truncating log to get in sync with the leader 0x"
-                                + Long.toHexString(qp.getZxid()));
-                        boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
-                        if (!truncated) {
-                            // not able to truncate the log
-                            LOG.fatal("Not able to truncate the log "
-                                    + Long.toHexString(qp.getZxid()));
-                            System.exit(13);
-                        }
-    
-                        zk.loadData();
-                    }
-                    else {
-                        LOG.fatal("Got unexpected packet from leader "
-                                + qp.getType() + " exiting ... " );
-                        System.exit(13);
-
-                    }
-                    zk.dataTree.lastProcessedZxid = newLeaderZxid;
-                }
-                ack.setZxid(newLeaderZxid & ~0xffffffffL);
-                writePacket(ack, true);
-                sock.setSoTimeout(self.tickTime * self.syncLimit);
-                zk.startup();
-                
                 while (self.running) {
                     readPacket(qp);
-                    switch (qp.getType()) {
-                    case Leader.PING:
-                        // Send back the ping with our session data
-                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                        DataOutputStream dos = new DataOutputStream(bos);
-                        HashMap<Long, Integer> touchTable = zk
-                                .getTouchSnapshot();
-                        for (Entry<Long, Integer> entry : touchTable.entrySet()) {
-                            dos.writeLong(entry.getKey());
-                            dos.writeInt(entry.getValue());
-                        }
-                        qp.setData(bos.toByteArray());
-                        writePacket(qp, true);
-                        break;
-                    case Leader.PROPOSAL:
-                        TxnHeader hdr = new TxnHeader();
-                        BinaryInputArchive ia = BinaryInputArchive
-                                .getArchive(new ByteArrayInputStream(qp.getData()));
-                        Record txn = SerializeUtils.deserializeTxn(ia, hdr);
-                        if (hdr.getZxid() != lastQueued + 1) {
-                            LOG.warn("Got zxid 0x"
-                                    + Long.toHexString(hdr.getZxid())
-                                    + " expected 0x"
-                                    + Long.toHexString(lastQueued + 1));
-                        }
-                        lastQueued = hdr.getZxid();
-                        zk.logRequest(hdr, txn);
-                        break;
-                    case Leader.COMMIT:
-                        zk.commit(qp.getZxid());
-                        break;
-                    case Leader.UPTODATE:
-                        zk.takeSnapshot();
-                        self.cnxnFactory.setZooKeeperServer(zk);
-                        break;
-                    case Leader.REVALIDATE:
-                        ByteArrayInputStream bis = new ByteArrayInputStream(qp
-                                .getData());
-                        DataInputStream dis = new DataInputStream(bis);
-                        long sessionId = dis.readLong();
-                        boolean valid = dis.readBoolean();
-                        synchronized (pendingRevalidations) {
-                            ServerCnxn cnxn = pendingRevalidations
-                                    .remove(sessionId);
-                            if (cnxn == null) {
-                                LOG.warn("Missing session 0x"
-                                        + Long.toHexString(sessionId)
-                                        + " for validation");
-                            } else {
-                                cnxn.finishSessionInit(valid);
-                            }
-                        }
-                        if (LOG.isTraceEnabled()) {
-                            ZooTrace.logTraceMessage(LOG,
-                                    ZooTrace.SESSION_TRACE_MASK,
-                                    "Session 0x" + Long.toHexString(sessionId)
-                                    + " is valid: " + valid);
-                        }
-                        break;
-                    case Leader.SYNC:
-                        zk.sync();
-                        break;
-                    }
-                }
+                    processPacket(qp);                   
+                }                              
             } catch (IOException e) {
                 LOG.warn("Exception when following the leader", e);
                 try {
@@ -314,95 +79,65 @@
                 }
     
                 synchronized (pendingRevalidations) {
-                    // clear pending revalitions
+                    // clear pending revalidations
                     pendingRevalidations.clear();
                     pendingRevalidations.notifyAll();
                 }
             }
         } finally {
-            zk.unregisterJMX(this);
+            zk.unregisterJMX((Learner)this);
         }
     }
 
-    private long lastQueued;
-
-    final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations =
-        new ConcurrentHashMap<Long, ServerCnxn>();
-    
-    public int getPendingRevalidationsCount() {
-        return pendingRevalidations.size();
-    }
-
     /**
-     * validate a seesion for a client
-     *
-     * @param clientId
-     *                the client to be revailidated
-     * @param timeout
-     *                the timeout for which the session is valid
-     * @return
+     * Examine the packet received in qp and dispatch based on its contents.
+     * @param qp
      * @throws IOException
-     * @throws InterruptedException
      */
-    void validateSession(ServerCnxn cnxn, long clientId, int timeout)
-            throws IOException, InterruptedException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        dos.writeLong(clientId);
-        dos.writeInt(timeout);
-        dos.close();
-        QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
-                .toByteArray(), null);
-        pendingRevalidations.put(clientId, cnxn);
-        if (LOG.isTraceEnabled()) {
-            ZooTrace.logTraceMessage(LOG,
-                                     ZooTrace.SESSION_TRACE_MASK,
-                                     "To validate session 0x"
-                                     + Long.toHexString(clientId));
+    protected void processPacket(QuorumPacket qp) throws IOException{
+        switch (qp.getType()) {
+        case Leader.PING:            
+            ping(qp);            
+            break;
+        case Leader.PROPOSAL:            
+            TxnHeader hdr = new TxnHeader();
+            BinaryInputArchive ia = BinaryInputArchive
+            .getArchive(new ByteArrayInputStream(qp.getData()));
+            Record txn = SerializeUtils.deserializeTxn(ia, hdr);
+            if (hdr.getZxid() != lastQueued + 1) {
+                LOG.warn("Got zxid 0x"
+                        + Long.toHexString(hdr.getZxid())
+                        + " expected 0x"
+                        + Long.toHexString(lastQueued + 1));
+            }
+            lastQueued = hdr.getZxid();
+            fzk.logRequest(hdr, txn);
+            break;
+        case Leader.COMMIT:
+            fzk.commit(qp.getZxid());
+            break;
+        case Leader.UPTODATE:
+            fzk.takeSnapshot();
+            self.cnxnFactory.setZooKeeperServer(fzk);
+            break;
+        case Leader.REVALIDATE:
+            revalidate(qp);
+            break;
+        case Leader.SYNC:
+            fzk.sync();
+            break;
         }
-        writePacket(qp, true);
     }
 
+
     /**
-     * send a request packet to the leader
-     *
-     * @param request
-     *                the request from the client
-     * @throws IOException
+     * The zxid of the last operation seen
+     * @return zxid
      */
-    void request(Request request) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream oa = new DataOutputStream(baos);
-        oa.writeLong(request.sessionId);
-        oa.writeInt(request.cxid);
-        oa.writeInt(request.type);
-        if (request.request != null) {
-            request.request.rewind();
-            int len = request.request.remaining();
-            byte b[] = new byte[len];
-            request.request.get(b);
-            request.request.rewind();
-            oa.write(b);
-        }
-        oa.close();
-        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
-                .toByteArray(), request.authInfo);
-//        QuorumPacket qp;
-//        if(request.type == OpCode.sync){
-//            qp = new QuorumPacket(Leader.SYNC, -1, baos
-//                    .toByteArray(), request.authInfo);
-//        }
-//        else{
-//        qp = new QuorumPacket(Leader.REQUEST, -1, baos
-//                .toByteArray(), request.authInfo);
-//        }
-        writePacket(qp, true);
-    }
-
     public long getZxid() {
         try {
-            synchronized (zk) {
-                return zk.getZxid();
+            synchronized (fzk) {
+                return fzk.getZxid();
             }
         } catch (NullPointerException e) {
             LOG.warn("error getting zxid", e);
@@ -410,20 +145,17 @@
         return -1;
     }
     
-    public long getLastQueued() {
+    /**
+     * The zxid of the last operation queued
+     * @return zxid
+     */
+    protected long getLastQueued() {
         return lastQueued;
     }
 
-    public void shutdown() {
-        // set the zookeeper server to null
-        self.cnxnFactory.setZooKeeperServer(null);
-        // clear all the connections
-        self.cnxnFactory.clear();
-        // shutdown previous zookeeper
-        if (zk != null) {
-            zk.shutdown();
-
-        }
+    @Override
+    public void shutdown() {    
         LOG.info("shutdown called", new Exception("shutdown Follower"));
+        super.shutdown();
     }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Fri Oct 30 16:19:13 2009
@@ -19,20 +19,15 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.Record;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.SyncRequestProcessor;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.txn.TxnHeader;
 
@@ -41,13 +36,11 @@
  * processors: FollowerRequestProcessor -> CommitProcessor ->
  * FinalRequestProcessor
  * 
- * A SyncRequestProcessor is also spawn off to log proposals from the leader.
+ * A SyncRequestProcessor is also spawned off to log proposals from the leader.
  */
-public class FollowerZooKeeperServer extends ZooKeeperServer {
+public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
     private static final Logger LOG = Logger.getLogger(FollowerZooKeeperServer.class);
 
-    private QuorumPeer self;
-
     CommitProcessor commitProcessor;
 
     SyncRequestProcessor syncProcessor;
@@ -65,19 +58,13 @@
     FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
             DataTreeBuilder treeBuilder) throws IOException {
         super(logFactory, self.tickTime,treeBuilder);
-        this.self = self;
+        this.self = self;        
         this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
     }
 
     public Follower getFollower(){
         return self.follower;
-    }
-    
-    @Override
-    protected void createSessionTracker() {
-        sessionTracker = new FollowerSessionTracker(this, sessionsWithTimeouts,
-                self.getId());
-    }
+    }      
 
     @Override
     protected void setupRequestProcessors() {
@@ -88,28 +75,10 @@
         firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
         ((FollowerRequestProcessor) firstProcessor).start();
         syncProcessor = new SyncRequestProcessor(this,
-                new SendAckRequestProcessor(getFollower()));
+                new SendAckRequestProcessor((Learner)getFollower()));
         syncProcessor.start();
     }
-
-    @Override
-    protected void revalidateSession(ServerCnxn cnxn, long sessionId,
-            int sessionTimeout) throws IOException, InterruptedException {
-        getFollower().validateSession(cnxn, sessionId, sessionTimeout);
-    }
-
-    public HashMap<Long, Integer> getTouchSnapshot() {
-        if (sessionTracker != null) {
-            return ((FollowerSessionTracker) sessionTracker).snapshot();
-        }
-        return new HashMap<Long, Integer>();
-    }
-
-    @Override
-    public long getServerId() {
-        return self.getId();
-    }
-
+    
     LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
 
     public void logRequest(TxnHeader hdr, Record txn) {
@@ -124,6 +93,12 @@
         syncProcessor.processRequest(request);
     }
 
+    /**
+     * When a COMMIT message is received, eventually this method is called, 
+     * which matches up the zxid from the COMMIT with (hopefully) the head of
+     * the pendingTxns queue and hands it to the commitProcessor to commit.
+     * @param zxid - must correspond to the head of pendingTxns if it exists
+     */
     public void commit(long zxid) {
         if (pendingTxns.size() == 0) {
             LOG.warn("Committing " + Long.toHexString(zxid)
@@ -155,14 +130,6 @@
     public int getGlobalOutstandingLimit() {
         return super.getGlobalOutstandingLimit() / (self.getQuorumSize() - 1);
     }
-
-    /**
-     * Do not do anything in the follower.
-     */
-    @Override
-    public void addCommittedProposal(Request r) {
-        //do nothing
-    }
     
     @Override
     public void shutdown() {
@@ -180,69 +147,14 @@
                     e);
         }
     }
-
-
-    @Override
-    protected void registerJMX() {
-        // register with JMX
-        try {
-            jmxDataTreeBean = new DataTreeBean(dataTree);
-            MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
-        } catch (Exception e) {
-            LOG.warn("Failed to register with JMX", e);
-            jmxDataTreeBean = null;
-        }
-    }
-
-    public void registerJMX(FollowerBean followerBean,
-            LocalPeerBean localPeerBean)
-    {
-        // register with JMX
-        if (self.jmxLeaderElectionBean != null) {
-            try {
-                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
-            } catch (Exception e) {
-                LOG.warn("Failed to register with JMX", e);
-            }
-            self.jmxLeaderElectionBean = null;
-        }
-
-        try {
-            jmxServerBean = followerBean;
-            MBeanRegistry.getInstance().register(followerBean, localPeerBean);
-        } catch (Exception e) {
-            LOG.warn("Failed to register with JMX", e);
-            jmxServerBean = null;
-        }
-    }
-
-    @Override
-    protected void unregisterJMX() {
-        // unregister from JMX
-        try {
-            if (jmxDataTreeBean != null) {
-                MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to unregister with JMX", e);
-        }
-        jmxDataTreeBean = null;
-    }
-
-    protected void unregisterJMX(Follower follower) {
-        // unregister from JMX
-        try {
-            if (jmxServerBean != null) {
-                MBeanRegistry.getInstance().unregister(jmxServerBean);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to unregister with JMX", e);
-        }
-        jmxServerBean = null;
-    }
     
     @Override
     public String getState() {
         return "follower";
     }
+
+    @Override
+    public Learner getLearner() {
+        return getFollower();
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Fri Oct 30 16:19:13 2009
@@ -72,48 +72,48 @@
     QuorumPeer self;
 
     // the follower acceptor thread
-    FollowerCnxAcceptor cnxAcceptor;
+    LearnerCnxAcceptor cnxAcceptor;
     
     // list of all the followers
-    public HashSet<FollowerHandler> followers = new HashSet<FollowerHandler>();
+    public HashSet<LearnerHandler> learners = new HashSet<LearnerHandler>();
 
-    // list of followers that are ready to follow (i.e synced with the leader)
-    public HashSet<FollowerHandler> forwardingFollowers = new HashSet<FollowerHandler>();
-    
+    // list of followers that are ready to follow (i.e synced with the leader)    
+    public HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();
+        
     //Pending sync requests
-    public HashMap<Long,List<FollowerSyncRequest>> pendingSyncs = new HashMap<Long,List<FollowerSyncRequest>>();
+    public HashMap<Long,List<LearnerSyncRequest>> pendingSyncs = new HashMap<Long,List<LearnerSyncRequest>>();
     
     //Follower counter
     AtomicLong followerCounter = new AtomicLong(-1);
     /**
-     * Adds follower to the leader.
+     * Adds peer to the leader.
      * 
-     * @param follower
-     *                instance of follower handle
+     * @param learner
+     *                instance of learner handle
      */
-    void addFollowerHandler(FollowerHandler follower) {
-        synchronized (followers) {
-            followers.add(follower);
+    void addLearnerHandler(LearnerHandler learner) {
+        synchronized (learners) {
+            learners.add(learner);
         }
     }
 
     /**
-     * Remove the follower from the followers list
+     * Remove the learner from the learner list
      * 
-     * @param follower
+     * @param peer
      */
-    void removeFollowerHandler(FollowerHandler follower) {
+    void removeLearnerHandler(LearnerHandler peer) {
         synchronized (forwardingFollowers) {
-            forwardingFollowers.remove(follower);
-        }
-        synchronized (followers) {
-            followers.remove(follower);
+            forwardingFollowers.remove(peer);            
+        }        
+        synchronized (learners) {
+            learners.remove(peer);
         }
     }
 
-    boolean isFollowerSynced(FollowerHandler follower){
+    boolean isLearnerSynced(LearnerHandler peer){
         synchronized (forwardingFollowers) {
-            return forwardingFollowers.contains(follower);
+            return forwardingFollowers.contains(peer);
         }        
     }
     
@@ -209,7 +209,7 @@
 
     Proposal newLeaderProposal = new Proposal();
     
-    class FollowerCnxAcceptor extends Thread{
+    class LearnerCnxAcceptor extends Thread{
         private volatile boolean stop = false;
         
         @Override
@@ -217,10 +217,10 @@
             try {
                 while (!stop) {
                     try{
-                        Socket s = ss.accept();
+                        Socket s = ss.accept();                        
                         s.setSoTimeout(self.tickTime * self.syncLimit);
                         s.setTcpNoDelay(nodelay);
-                        FollowerHandler fh = new FollowerHandler(s, Leader.this);
+                        LearnerHandler fh = new LearnerHandler(s, Leader.this);
                         fh.start();
                     } catch (SocketException e) {
                         if (stop) {
@@ -267,9 +267,10 @@
             synchronized(this){
                 lastProposed = zk.getZxid();
             }
-            
+                      
             newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
-                    null, null);
+                    null, null);            
+
             if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                 LOG.info("NEWLEADER proposal has Zxid of "
                         + Long.toHexString(newLeaderProposal.packet.getZxid()));
@@ -278,7 +279,7 @@
             
             // Start thread that waits for connection requests from 
             // new followers.
-            cnxAcceptor = new FollowerCnxAcceptor();
+            cnxAcceptor = new LearnerCnxAcceptor();
             cnxAcceptor.start();
             
             // We have to get at least a majority of servers in sync with
@@ -296,7 +297,7 @@
                     
                     shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
                     HashSet<Long> followerSet = new HashSet<Long>();
-                    for(FollowerHandler f : followers)
+                    for(LearnerHandler f : learners)
                         followerSet.add(f.getSid());
                     
                     if (self.getQuorumVerifier().containsQuorum(followerSet)) {
@@ -333,8 +334,8 @@
                 
                 // lock on the followers when we use it.
                 syncedSet.add(self.getId());
-                synchronized (followers) {
-                    for (FollowerHandler f : followers) {
+                synchronized (learners) {
+                    for (LearnerHandler f : learners) {
                         if (f.synced()) {
                             syncedCount++;
                             syncedSet.add(f.getSid());
@@ -388,10 +389,10 @@
         } catch (IOException e) {
             LOG.warn("Ignoring unexpected exception during close",e);
         }
-        synchronized (followers) {
-            for (Iterator<FollowerHandler> it = followers.iterator(); it
+        synchronized (learners) {
+            for (Iterator<LearnerHandler> it = learners.iterator(); it
                     .hasNext();) {
-                FollowerHandler f = it.next();
+                LearnerHandler f = it.next();
                 it.remove();
                 f.shutdown();
             }
@@ -466,7 +467,7 @@
                 commit(zxid);
                 zk.commitProcessor.commit(p.request);
                 if(pendingSyncs.containsKey(zxid)){
-                    for(FollowerSyncRequest r: pendingSyncs.remove(zxid)) {
+                    for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                         sendSync(r);
                     }
                 }
@@ -538,12 +539,12 @@
      */
     void sendPacket(QuorumPacket qp) {
         synchronized (forwardingFollowers) {
-            for (FollowerHandler f : forwardingFollowers) {
+            for (LearnerHandler f : forwardingFollowers) {                
                 f.queuePacket(qp);
             }
         }
     }
-
+    
     long lastCommitted = -1;
 
     /**
@@ -604,13 +605,13 @@
      * @param r the request
      */
     
-    synchronized public void processSync(FollowerSyncRequest r){
+    synchronized public void processSync(LearnerSyncRequest r){
         if(outstandingProposals.isEmpty()){
             sendSync(r);
         } else {
-            List<FollowerSyncRequest> l = pendingSyncs.get(lastProposed);
+            List<LearnerSyncRequest> l = pendingSyncs.get(lastProposed);
             if (l == null) {
-                l = new ArrayList<FollowerSyncRequest>();
+                l = new ArrayList<LearnerSyncRequest>();
             }
             l.add(r);
             pendingSyncs.put(lastProposed, l);
@@ -624,7 +625,7 @@
      * @param r
      */
             
-    public void sendSync(FollowerSyncRequest r){
+    public void sendSync(LearnerSyncRequest r){
         QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
         r.fh.queuePacket(qp);
     }
@@ -636,7 +637,7 @@
      * @param handler handler of the follower
      * @return last proposed zxid
      */
-    synchronized public long startForwarding(FollowerHandler handler,
+    synchronized public long startForwarding(LearnerHandler handler,
             long lastSeenZxid) {
         // Queue up any outstanding requests enabling the receipt of
         // new requests

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderBean.java?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderBean.java Fri Oct 30 16:19:13 2009
@@ -20,7 +20,7 @@
 
 import org.apache.zookeeper.server.ZooKeeperServerBean;
 import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.quorum.FollowerHandler;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
 import org.apache.zookeeper.server.quorum.Leader;
 
 /**
@@ -44,7 +44,7 @@
     
     public String followerInfo() {
         StringBuffer sb = new StringBuffer();
-        for (FollowerHandler handler : leader.followers) {
+        for (LearnerHandler handler : leader.learners) {
             sb.append(handler.toString()).append("\n");
         }
         return sb.toString();

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=831371&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Fri Oct 30 16:19:13 2009
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+
+/**
+ * This class is the superclass of two of the three main actors in a ZK
+ * ensemble: Followers and Observers. Both Followers and Observers share 
+ * a good deal of code which is moved into Peer to avoid duplication. 
+ */
+public class Learner {       
+    QuorumPeer self;
+    LearnerZooKeeperServer zk;
+    
+    protected BufferedOutputStream bufferedOutput;
+    
+    protected Socket sock;
+    
+    /**
+     * Socket getter
+     * @return 
+     */
+    public Socket getSocket() {
+        return sock;
+    }
+    
+    protected InputArchive leaderIs;
+    protected OutputArchive leaderOs;    
+    
+    protected static final Logger LOG = Logger.getLogger(Learner.class);
+
+    static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
+    static {
+        LOG.info("TCP NoDelay set to: " + nodelay);
+    }   
+    
+    final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations =
+        new ConcurrentHashMap<Long, ServerCnxn>();
+    
+    public int getPendingRevalidationsCount() {
+        return pendingRevalidations.size();
+    }
+    
+    /**
+     * validate a session for a client
+     *
+     * @param clientId
+     *                the client to be revalidated
+     * @param timeout
+     *                the timeout for which the session is valid
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void validateSession(ServerCnxn cnxn, long clientId, int timeout)
+            throws IOException, InterruptedException {
+        LOG.info("Revalidating client: " + clientId);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        dos.writeLong(clientId);
+        dos.writeInt(timeout);
+        dos.close();
+        QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
+                .toByteArray(), null);
+        pendingRevalidations.put(clientId, cnxn);
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logTraceMessage(LOG,
+                                     ZooTrace.SESSION_TRACE_MASK,
+                                     "To validate session 0x"
+                                     + Long.toHexString(clientId));
+        }
+        writePacket(qp, true);
+    }     
+    
+    /**
+     * write a packet to the leader
+     *
+     * @param pp
+     *                the proposal packet to be sent to the leader
+     * @throws IOException
+     */
+    void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+        synchronized (leaderOs) {
+            if (pp != null) {
+                leaderOs.writeRecord(pp, "packet");
+            }
+            if (flush) {
+                bufferedOutput.flush();
+            }
+        }
+    }
+
+    /**
+     * read a packet from the leader
+     *
+     * @param pp
+     *                the packet to be instantiated
+     * @throws IOException
+     */
+    void readPacket(QuorumPacket pp) throws IOException {
+        synchronized (leaderIs) {
+            leaderIs.readRecord(pp, "packet");
+        }
+        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+        if (pp.getType() == Leader.PING) {
+            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+        }
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
+        }
+    }
+    
+    /**
+     * send a request packet to the leader
+     *
+     * @param request
+     *                the request from the client
+     * @throws IOException
+     */
+    void request(Request request) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream oa = new DataOutputStream(baos);
+        oa.writeLong(request.sessionId);
+        oa.writeInt(request.cxid);
+        oa.writeInt(request.type);
+        if (request.request != null) {
+            request.request.rewind();
+            int len = request.request.remaining();
+            byte b[] = new byte[len];
+            request.request.get(b);
+            request.request.rewind();
+            oa.write(b);
+        }
+        oa.close();
+        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
+                .toByteArray(), request.authInfo);
+        writePacket(qp, true);
+    }
+    
+    /**
+     * Returns the address of the node we think is the leader.
+     */
+    protected InetSocketAddress findLeader() {
+        InetSocketAddress addr = null;
+        // Find the leader by id
+        Vote current = self.getCurrentVote();
+        for (QuorumServer s : self.getView().values()) {
+            if (s.id == current.id) {
+                addr = s.addr;
+                break;
+            }
+        }
+        if (addr == null) {
+            LOG.warn("Couldn't find the leader with id = "
+                    + current.id);
+        }
+        return addr;
+    }
+    
+    /**
+     * Establish a connection with the Leader found by findLeader. Retries
+     * 5 times before giving up. 
+     * @param addr - the address of the Leader to connect to.
+     * @throws IOException - if the socket connection fails on the 5th attempt
+     * @throws ConnectException
+     * @throws InterruptedException
+     */
+    protected void connectToLeader(InetSocketAddress addr) 
+    throws IOException, ConnectException, InterruptedException {
+        sock = new Socket();        
+        sock.setSoTimeout(self.tickTime * self.initLimit);
+        for (int tries = 0; tries < 5; tries++) {
+            try {
+                sock.connect(addr, self.tickTime * self.syncLimit);
+                sock.setTcpNoDelay(nodelay);
+                break;
+            } catch (IOException e) {
+                if (tries == 4) {
+                    LOG.error("Unexpected exception",e);
+                    throw e;
+                } else {
+                    LOG.warn("Unexpected exception, tries="+tries+
+                            ", connecting to " + addr,e);
+                    sock = new Socket();
+                    sock.setSoTimeout(self.tickTime * self.initLimit);
+                }
+            }
+            Thread.sleep(1000);
+        }
+        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
+                sock.getInputStream()));
+        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
+        leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+    }   
+    
+    /**
+     * Once connected to the leader, perform the handshake protocol to
+     * establish a following / observing connection. 
+     * @param pktType
+     * @return the zxid the Leader sends for synchronization purposes.
+     * @throws IOException
+     */
+    protected long registerWithLeader(int pktType) throws IOException{
+        /*
+         * Send follower info, including last zxid and sid
+         */
+        QuorumPacket qp = new QuorumPacket();                
+        qp.setType(pktType);
+        long sentLastZxid = self.getLastLoggedZxid();
+        qp.setZxid(sentLastZxid);
+        
+        /*
+         * Add sid to payload
+         */
+        ByteArrayOutputStream bsid = new ByteArrayOutputStream();
+        DataOutputStream dsid = new DataOutputStream(bsid);
+        dsid.writeLong(self.getId());
+        qp.setData(bsid.toByteArray());
+        
+        writePacket(qp, true);
+        readPacket(qp);        
+
+        if (qp.getType() != Leader.NEWLEADER) {
+            LOG.error("First packet should have been NEWLEADER");
+            throw new IOException("First packet should have been NEWLEADER");
+        }
+        
+        return qp.getZxid();
+    } 
+    
+    /**
+     * Finally, synchronize our history with the Leader. 
+     * @param newLeaderZxid
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
+        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
+        QuorumPacket qp = new QuorumPacket();
+        
+        readPacket(qp);        
+        synchronized (zk) {
+            if (qp.getType() == Leader.DIFF) {
+                LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));                
+                zk.loadData();
+            }
+            else if (qp.getType() == Leader.SNAP) {
+                LOG.info("Getting a snapshot from leader");
+                // The leader is going to dump the database
+                zk.deserializeSnapshot(leaderIs);
+                String signature = leaderIs.readString("signature");
+                if (!signature.equals("BenWasHere")) {
+                    LOG.error("Missing signature. Got " + signature);
+                    throw new IOException("Missing signature");
+                }
+            } else if (qp.getType() == Leader.TRUNC) {
+                //we need to truncate the log to the lastzxid of the leader
+                LOG.warn("Truncating log to get in sync with the leader 0x"
+                        + Long.toHexString(qp.getZxid()));
+                boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
+                if (!truncated) {
+                    // not able to truncate the log
+                    LOG.fatal("Not able to truncate the log "
+                            + Long.toHexString(qp.getZxid()));
+                    System.exit(13);
+                }
+
+                zk.loadData();
+            }
+            else {
+                LOG.fatal("Got unexpected packet from leader "
+                        + qp.getType() + " exiting ... " );
+                System.exit(13);
+
+            }
+            zk.dataTree.lastProcessedZxid = newLeaderZxid;
+        }
+        ack.setZxid(newLeaderZxid & ~0xffffffffL);
+        writePacket(ack, true);
+        sock.setSoTimeout(self.tickTime * self.syncLimit);
+        zk.startup();
+    }
+    
+    protected void revalidate(QuorumPacket qp) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(qp
+                .getData());
+        DataInputStream dis = new DataInputStream(bis);
+        long sessionId = dis.readLong();
+        boolean valid = dis.readBoolean();
+        synchronized (pendingRevalidations) {
+            ServerCnxn cnxn = pendingRevalidations
+                    .remove(sessionId);
+            if (cnxn == null) {
+                LOG.warn("Missing session 0x"
+                        + Long.toHexString(sessionId)
+                        + " for validation");
+            } else {
+                cnxn.finishSessionInit(valid);
+            }
+        }
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logTraceMessage(LOG,
+                    ZooTrace.SESSION_TRACE_MASK,
+                    "Session 0x" + Long.toHexString(sessionId)
+                    + " is valid: " + valid);
+        }
+    }
+        
+    protected void ping(QuorumPacket qp) throws IOException {
+        // Send back the ping with our session data
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        HashMap<Long, Integer> touchTable = zk
+                .getTouchSnapshot();
+        for (Entry<Long, Integer> entry : touchTable.entrySet()) {
+            dos.writeLong(entry.getKey());
+            dos.writeInt(entry.getValue());
+        }
+        qp.setData(bos.toByteArray());
+        writePacket(qp, true);
+    }
+    
+    
+    /**
+     * Shutdown the Peer
+     */
+    public void shutdown() {
+        // set the zookeeper server to null
+        self.cnxnFactory.setZooKeeperServer(null);
+        // clear all the connections
+        self.cnxnFactory.clear();
+        // shutdown previous zookeeper
+        if (zk != null) {
+            zk.shutdown();
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=831371&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Fri Oct 30 16:19:13 2009
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * There will be an instance of this class created by the Leader for each
+ * learner. All communication with a learner is handled by this
+ * class.
+ */
+public class LearnerHandler extends Thread {
+    private static final Logger LOG = Logger.getLogger(LearnerHandler.class);
+
+    protected final Socket sock;    
+
+    public Socket getSocket() {
+        return sock;
+    }
+
+    final Leader leader;
+
+    long tickOfLastAck;
+    
+    /**
+     * ZooKeeper server identifier of this learner
+     */
+    protected long sid = 0;
+    
+    long getSid(){
+        return sid;
+    }                    
+
+    /**
+     * The packets to be sent to the learner
+     */
+    final LinkedBlockingQueue<QuorumPacket> queuedPackets =
+        new LinkedBlockingQueue<QuorumPacket>();
+
+    private BinaryInputArchive ia;
+
+    private BinaryOutputArchive oa;
+
+    private BufferedOutputStream bufferedOutput;
+
+    LearnerHandler(Socket sock, Leader leader) throws IOException {
+        super("LeanerHandler-" + sock.getRemoteSocketAddress());
+        this.sock = sock;
+        this.leader = leader;
+        leader.addLearnerHandler(this);
+    }
+    
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("LearnerHandler ").append(sock);
+        sb.append(" tickOfLastAck:").append(tickOfLastAck());
+        sb.append(" synced?:").append(synced());
+        sb.append(" queuedPacketLength:").append(queuedPackets.size());
+        return sb.toString();
+    }
+
+    /**
+     * If this packet is queued, the sender thread will exit
+     */
+    final QuorumPacket proposalOfDeath = new QuorumPacket();
+   
+    /**
+     * This method will use the thread to send packets added to the
+     * queuedPackets list
+     *
+     * @throws InterruptedException
+     */
+    private void sendPackets() throws InterruptedException {
+        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+        while (true) {
+            try {
+                QuorumPacket p;
+                p = queuedPackets.poll();
+                if (p == null) {
+                    bufferedOutput.flush();
+                    p = queuedPackets.take();
+                }
+
+                if (p == proposalOfDeath) {
+                    // Packet of death!
+                    break;
+                }
+                if (p.getType() == Leader.PING) {
+                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+                }
+                if (LOG.isTraceEnabled()) {
+                    ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
+                }
+                oa.writeRecord(p, "packet");
+            } catch (IOException e) {
+                if (!sock.isClosed()) {
+                    LOG.warn("Unexpected exception",e);
+                }
+                break;
+            }
+        }
+    }
+
+    static public String packetToString(QuorumPacket p) {
+        if (true)
+            return null;
+        String type = null;
+        String mess = null;
+        Record txn = null;
+        
+        switch (p.getType()) {
+        case Leader.ACK:
+            type = "ACK";
+            break;
+        case Leader.COMMIT:
+            type = "COMMIT";
+            break;
+        case Leader.FOLLOWERINFO:
+            type = "FOLLOWERINFO";
+            break;    
+        case Leader.NEWLEADER:
+            type = "NEWLEADER";
+            break;
+        case Leader.PING:
+            type = "PING";
+            break;
+        case Leader.PROPOSAL:
+            type = "PROPOSAL";
+            BinaryInputArchive ia = BinaryInputArchive
+                    .getArchive(new ByteArrayInputStream(p.getData()));
+            TxnHeader hdr = new TxnHeader();
+            try {
+                txn = SerializeUtils.deserializeTxn(ia, hdr);
+                // mess = "transaction: " + txn.toString();
+            } catch (IOException e) {
+                LOG.warn("Unexpected exception",e);
+            }
+            break;
+        case Leader.REQUEST:
+            type = "REQUEST";
+            break;
+        case Leader.REVALIDATE:
+            type = "REVALIDATE";
+            ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
+            DataInputStream dis = new DataInputStream(bis);
+            try {
+                long id = dis.readLong();
+                mess = " sessionid = " + id;
+            } catch (IOException e) {
+                LOG.warn("Unexpected exception", e);
+            }
+
+            break;
+        case Leader.UPTODATE:
+            type = "UPTODATE";
+            break;
+        default:
+            type = "UNKNOWN" + p.getType();
+        }
+        String entry = null;
+        if (type != null) {
+            entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
+        }
+        return entry;
+    }
+
+    /**
+     * This thread will receive packets from the peer and process them and
+     * also listen to new connections from new peers.
+     */
+    @Override
+    public void run() {
+        try {
+            
+            ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
+                    .getInputStream()));
+            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
+            oa = BinaryOutputArchive.getArchive(bufferedOutput);
+
+            QuorumPacket qp = new QuorumPacket();
+            ia.readRecord(qp, "packet");
+            if(qp.getType() != Leader.FOLLOWERINFO) {
+            	LOG.error("First packet " + qp.toString()
+                        + " is not FOLLOWERINFO!");
+                return;
+            }
+            if (qp.getData() != null) {
+            	ByteBuffer bbsid = ByteBuffer.wrap(qp.getData());
+                this.sid = bbsid.getLong();
+            } else {
+            	this.sid = leader.followerCounter.getAndDecrement();
+            }
+
+            LOG.info("Follower sid: " + this.sid + " : info : "
+                    + leader.self.quorumPeers.get(this.sid));
+            
+            /* this is the last zxid from the follower but the leader might have to
+              restart the follower from a different zxid depending on truncate and diff. */
+            long peerLastZxid = qp.getZxid();
+            /* the default to send to the follower */
+            int packetToSend = Leader.SNAP;
+            boolean logTxns = true;
+            long zxidToSend = 0;
+            
+            /** the packets that the follower needs to get updates from **/
+            long updates = peerLastZxid;
+            
+            /* we are sending the diff check if we have proposals in memory to be able to 
+             * send a diff to the 
+             */ 
+            synchronized(leader.zk.committedLog) {
+                if (leader.zk.committedLog.size() != 0) {
+                    if ((leader.zk.maxCommittedLog >= peerLastZxid)
+                            && (leader.zk.minCommittedLog <= peerLastZxid)) {
+                        packetToSend = Leader.DIFF;
+                        zxidToSend = leader.zk.maxCommittedLog;
+                        for (Proposal propose: leader.zk.committedLog) {
+                            if (propose.packet.getZxid() > peerLastZxid) {
+                                queuePacket(propose.packet);
+                                QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
+                                        null, null);
+                                queuePacket(qcommit);
+
+                            }
+                        }
+                    }
+                }
+                else {
+                    logTxns = false;
+                }            
+						}
+            
+            //check if we decided to send a diff or we need to send a truncate
+            // we avoid using epochs for truncating because epochs make things
+            // complicated. Two epochs might have the last 32 bits as same.
+            // only if we know that there is a committed zxid in the queue that
+            // is less than the one the peer has we send a trunc else to make
+            // things simple we just send sanpshot.
+            if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
+                // this is the only case that we are sure that
+                // we can ask the peer to truncate the log
+                packetToSend = Leader.TRUNC;
+                zxidToSend = leader.zk.maxCommittedLog;
+                updates = zxidToSend;
+            }
+            
+            /* see what other packets from the proposal
+             * and tobeapplied queues need to be sent
+             * and then decide if we can just send a DIFF
+             * or we actually need to send the whole snapshot
+             */
+            long leaderLastZxid = leader.startForwarding(this, updates);
+            // a special case when both the ids are the same 
+            if (peerLastZxid == leaderLastZxid) {
+                packetToSend = Leader.DIFF;
+                zxidToSend = leaderLastZxid;
+            }
+
+            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                    leaderLastZxid, null, null);
+            oa.writeRecord(newLeaderQP, "packet");
+            bufferedOutput.flush();
+            
+           
+            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
+            bufferedOutput.flush();
+            
+            /* if we are not truncating or sending a diff just send a snapshot */
+            if (packetToSend == Leader.SNAP) {
+                LOG.warn("Sending snapshot last zxid of peer is 0x"
+                        + Long.toHexString(peerLastZxid) + " " 
+                        + " zxid of leader is 0x"
+                        + Long.toHexString(leaderLastZxid));
+                // Dump data to peer
+                leader.zk.serializeSnapshot(oa);
+                oa.writeString("BenWasHere", "signature");
+            }
+            bufferedOutput.flush();
+            
+            // Mutation packets will be queued during the serialize,
+            // so we need to mark when the peer can actually start
+            // using the data
+            //
+            queuedPackets
+                    .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
+
+            // Start sending packets
+            new Thread() {
+                public void run() {
+                    Thread.currentThread().setName(
+                            "Sender-" + sock.getRemoteSocketAddress());
+                    try {
+                        sendPackets();
+                    } catch (InterruptedException e) {
+                        LOG.warn("Unexpected interruption",e);
+                    }
+                }
+            }.start();
+
+            while (true) {
+                qp = new QuorumPacket();
+                ia.readRecord(qp, "packet");
+
+                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+                if (qp.getType() == Leader.PING) {
+                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+                }
+                if (LOG.isTraceEnabled()) {
+                    ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
+                }
+                tickOfLastAck = leader.self.tick;
+
+
+                ByteBuffer bb;
+                long sessionId;
+                int cxid;
+                int type;
+
+                switch (qp.getType()) {
+                case Leader.ACK:
+                    leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
+                    break;
+                case Leader.PING:
+                    // Process the touches
+                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
+                            .getData());
+                    DataInputStream dis = new DataInputStream(bis);
+                    while (dis.available() > 0) {
+                        long sess = dis.readLong();
+                        int to = dis.readInt();
+                        leader.zk.touch(sess, to);
+                    }
+                    break;
+                case Leader.REVALIDATE:
+                    bis = new ByteArrayInputStream(qp.getData());
+                    dis = new DataInputStream(bis);
+                    long id = dis.readLong();
+                    int to = dis.readInt();
+                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                    DataOutputStream dos = new DataOutputStream(bos);
+                    dos.writeLong(id);
+                    boolean valid = leader.zk.touch(id, to);
+                    if (valid) {
+                        try {
+                            //set the session owner
+                            // as the follower that
+                            // owns the session
+                            leader.zk.setOwner(id, this);
+                        } catch (SessionExpiredException e) {
+                            LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
+                        }
+                    }
+                    if (LOG.isTraceEnabled()) {
+                        ZooTrace.logTraceMessage(LOG,
+                                                 ZooTrace.SESSION_TRACE_MASK,
+                                                 "Session 0x" + Long.toHexString(id)
+                                                 + " is valid: "+ valid);
+                    }
+                    dos.writeBoolean(valid);
+                    qp.setData(bos.toByteArray());
+                    queuedPackets.add(qp);
+                    break;
+                case Leader.REQUEST:                    
+                    bb = ByteBuffer.wrap(qp.getData());
+                    sessionId = bb.getLong();
+                    cxid = bb.getInt();
+                    type = bb.getInt();
+                    bb = bb.slice();
+                    if(type == OpCode.sync){
+                     	leader.zk.submitRequest(new LearnerSyncRequest(this, sessionId, cxid, type, bb,
+                                qp.getAuthinfo()));
+                    } else {
+                        Request si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
+                        si.setOwner(this);
+                        leader.zk.submitRequest(si);
+                    }
+                    break;
+                default:
+                }
+            }
+        } catch (IOException e) {
+            if (sock != null && !sock.isClosed()) {
+                LOG.error("Unexpected exception causing shutdown while sock "
+                        + "still open", e);
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Unexpected exception causing shutdown", e);
+        } finally {
+            LOG.warn("******* GOODBYE " 
+                    + (sock != null ? sock.getRemoteSocketAddress() : "<null>")
+                    + " ********");
+            // Send the packet of death
+            try {
+                queuedPackets.put(proposalOfDeath);
+            } catch (InterruptedException e) {
+                LOG.warn("Ignoring unexpected exception", e);
+            }
+            shutdown();
+        }
+    }
+
+    public void shutdown() {
+        try {
+            if (sock != null && !sock.isClosed()) {
+                sock.close();
+            }
+        } catch (IOException e) {
+            LOG.warn("Ignoring unexpected exception during socket close", e);
+        }
+        leader.removeLearnerHandler(this);
+    }
+
+    public long tickOfLastAck() {
+        return tickOfLastAck;
+    }
+
+    /**
+     * ping calls from the leader to the peers
+     */
+    public void ping() {
+        long id;
+        synchronized(leader) {
+            id = leader.lastProposed;
+        }
+        QuorumPacket ping = new QuorumPacket(Leader.PING, id,
+                null, null);
+        queuePacket(ping);
+    }
+
+    void queuePacket(QuorumPacket p) {
+        queuedPackets.add(p);
+    }
+
+    public boolean synced() {
+        return isAlive()
+                && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
+    }
+}

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java?rev=831371&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java Fri Oct 30 16:19:13 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.zookeeper.server.SessionTracker;
+import org.apache.zookeeper.server.SessionTrackerImpl;
+
+/**
+ * This is really just a shell of a SessionTracker that tracks session activity
+ * to be forwarded to the Leader using a PING.
+ */
+public class LearnerSessionTracker implements SessionTracker {
+    SessionExpirer expirer;
+
+    HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
+    long serverId = 1;
+    long nextSessionId=0;
+    
+    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+
+    
+    /**
+     * 
+     */
+    public LearnerSessionTracker(SessionExpirer expirer,
+            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, long id) {
+        this.expirer = expirer;
+        this.sessionsWithTimeouts = sessionsWithTimeouts;
+        this.serverId = id;
+        nextSessionId = SessionTrackerImpl.initializeNextSession(this.serverId);
+        
+    }
+
+    synchronized public void removeSession(long sessionId) {
+        sessionsWithTimeouts.remove(sessionId);
+        touchTable.remove(sessionId);
+    }
+
+    public void shutdown() {
+    }
+
+    synchronized public void addSession(long sessionId, int sessionTimeout) {
+        sessionsWithTimeouts.put(sessionId, sessionTimeout);
+        touchTable.put(sessionId, sessionTimeout);
+    }
+
+    synchronized public boolean touchSession(long sessionId, int sessionTimeout) {
+        touchTable.put(sessionId, sessionTimeout);
+        return true;
+    }
+
+    synchronized HashMap<Long, Integer> snapshot() {
+        HashMap<Long, Integer> oldTouchTable = touchTable;
+        touchTable = new HashMap<Long, Integer>();
+        return oldTouchTable;
+    }
+
+
+    synchronized public long createSession(int sessionTimeout) {
+        return (nextSessionId++);
+    }
+
+    public void checkSession(long sessionId, Object owner)  {
+        // Nothing to do here. Sessions are checked at the Leader
+    }
+    
+    public void setOwner(long sessionId, Object owner) {
+        // Nothing to do here. Sessions are checked at the Leader
+    }
+}

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java?rev=831371&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java Fri Oct 30 16:19:13 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.Request;
+
+public class LearnerSyncRequest extends Request {
+	LearnerHandler fh;
+	public LearnerSyncRequest(LearnerHandler fh, long sessionId, int xid, int type,
+			ByteBuffer bb, List<Id> authInfo) {
+		super(null, sessionId, xid, type, bb, authInfo);
+		this.fh = fh;
+	}
+}

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=831371&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java Fri Oct 30 16:19:13 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * Parent class for all ZooKeeperServers for Learners 
+ */
+public abstract class LearnerZooKeeperServer extends ZooKeeperServer {    
+    
+    protected QuorumPeer self;
+    
+    public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory,tickTime,treeBuilder);
+    }
+
+    /**
+     * Abstract method to return the learner associated with this server.
+     * Since the Learner may change under our feet (when QuorumPeer reassigns
+     * it) we can't simply take a reference here. Instead, we need the 
+     * subclasses to implement this.     
+     */
+    abstract public Learner getLearner();        
+    
+    /**
+     * Returns the current state of the session tracker. This is only currently
+     * used by a Learner to build a ping response packet.
+     * 
+     */
+    protected HashMap<Long, Integer> getTouchSnapshot() {
+        if (sessionTracker != null) {
+            return ((LearnerSessionTracker) sessionTracker).snapshot();
+        }
+        return new HashMap<Long, Integer>();
+    }
+    
+    /**
+     * Returns the id of the associated QuorumPeer, which will do for a unique
+     * id of this server. 
+     */
+    public long getServerId() {
+        return self.getId();
+    }    
+    
+    /**
+     * Learners don't make use of this method, only Leaders.
+     */
+    @Override
+    public void addCommittedProposal(Request request) {
+        // Don't do anything!
+    }
+    
+    @Override
+    protected void createSessionTracker() {
+        sessionTracker = new LearnerSessionTracker(this, sessionsWithTimeouts,
+                self.getId());
+    }
+    
+    @Override
+    protected void revalidateSession(ServerCnxn cnxn, long sessionId,
+            int sessionTimeout) throws IOException, InterruptedException {
+        getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+    }
+    
+    @Override
+    protected void registerJMX() {
+        // register with JMX
+        try {
+            jmxDataTreeBean = new DataTreeBean(dataTree);
+            MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            jmxDataTreeBean = null;
+        }
+    }
+
+    public void registerJMX(ZooKeeperServerBean serverBean,
+            LocalPeerBean localPeerBean)
+    {
+        // register with JMX
+        if (self.jmxLeaderElectionBean != null) {
+            try {
+                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
+            } catch (Exception e) {
+                LOG.warn("Failed to register with JMX", e);
+            }
+            self.jmxLeaderElectionBean = null;
+        }
+
+        try {
+            jmxServerBean = serverBean;
+            MBeanRegistry.getInstance().register(serverBean, localPeerBean);
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            jmxServerBean = null;
+        }
+    }
+
+    @Override
+    protected void unregisterJMX() {
+        // unregister from JMX
+        try {
+            if (jmxDataTreeBean != null) {
+                MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to unregister with JMX", e);
+        }
+        jmxDataTreeBean = null;
+    }
+
+    protected void unregisterJMX(Learner peer) {
+        // unregister from JMX
+        try {
+            if (jmxServerBean != null) {
+                MBeanRegistry.getInstance().unregister(jmxServerBean);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to unregister with JMX", e);
+        }
+        jmxServerBean = null;
+    }
+}

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java Fri Oct 30 16:19:13 2009
@@ -62,8 +62,8 @@
          * call processRequest on the next processor.
          */
         
-        if(request instanceof FollowerSyncRequest){
-            zks.getLeader().processSync((FollowerSyncRequest)request);
+        if(request instanceof LearnerSyncRequest){
+            zks.getLeader().processSync((LearnerSyncRequest)request);
         } else {
                 nextProcessor.processRequest(request);
             if (request.hdr != null) {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Oct 30 16:19:13 2009
@@ -452,7 +452,7 @@
         try {
             jmxQuorumBean = new QuorumBean(this);
             MBeanRegistry.getInstance().register(jmxQuorumBean, null);
-            for(QuorumServer s: quorumPeers.values()){
+            for(QuorumServer s: getView().values()){
                 ZKMBeanInfo p;
                 if (getId() == s.id) {
                     p = jmxLocalPeerBean = new LocalPeerBean(this);
@@ -548,16 +548,33 @@
         }
     }
 
+    /**
+     * A 'view' is a node's current opinion of the membership of the
+     * ensemble. 
+     */
+    public Map<Long,QuorumPeer.QuorumServer> getView() {
+        return this.quorumPeers;
+    }
+    
+    /**
+     * Check if a node is in the current view. With static membership, the
+     * result of this check will never change; only when dynamic membership
+     * is introduced will this be more useful.
+     */
+    public boolean viewContains(Long sid) {
+        return this.quorumPeers.containsKey(sid);
+    }
+    
     public String[] getQuorumPeers() {
         List<String> l = new ArrayList<String>();
         synchronized (this) {
             if (leader != null) {
-                synchronized (leader.followers) {
-                    for (FollowerHandler fh : leader.followers) {
-                        if (fh.sock == null)
+                synchronized (leader.learners) {
+                    for (LearnerHandler fh : leader.learners) {
+                        if (fh.getSocket() == null)
                             continue;
-                        String s = fh.sock.getRemoteSocketAddress().toString();
-                        if (leader.isFollowerSynced(fh))
+                        String s = fh.getSocket().getRemoteSocketAddress().toString();
+                        if (leader.isLearnerSynced(fh))
                             s += "*";
                         l.add(s);
                     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java Fri Oct 30 16:19:13 2009
@@ -30,10 +30,10 @@
 public class SendAckRequestProcessor implements RequestProcessor, Flushable {
     private static final Logger LOG = Logger.getLogger(SendAckRequestProcessor.class);
     
-    Follower follower;
+    Learner learner;
 
-    SendAckRequestProcessor(Follower follower) {
-        this.follower = follower;
+    SendAckRequestProcessor(Learner peer) {
+        this.learner = peer;
     }
 
     public void processRequest(Request si) {
@@ -41,12 +41,12 @@
             QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
                 null);
             try {
-                follower.writePacket(qp, false);
+                learner.writePacket(qp, false);
             } catch (IOException e) {
                 LOG.warn("Closing connection to leader, exception during packet send", e);
                 try {
-                    if (!follower.sock.isClosed()) {
-                        follower.sock.close();
+                    if (!learner.sock.isClosed()) {
+                        learner.sock.close();
                     }
                 } catch (IOException e1) {
                     // Nothing to do, we are shutting things down, so an exception here is irrelevant
@@ -58,12 +58,12 @@
     
     public void flush() throws IOException {
         try {
-            follower.writePacket(null, true);
+            learner.writePacket(null, true);
         } catch(IOException e) {
             LOG.warn("Closing connection to leader, exception during packet send", e);
             try {
-                if (!follower.sock.isClosed()) {
-                    follower.sock.close();
+                if (!learner.sock.isClosed()) {
+                    learner.sock.close();
                 }
             } catch (IOException e1) {
                     // Nothing to do, we are shutting things down, so an exception here is irrelevant

Modified: hadoop/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml?rev=831371&r1=831370&r2=831371&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml (original)
+++ hadoop/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml Fri Oct 30 16:19:13 2009
@@ -51,8 +51,8 @@
 
    <!-- Two unrecoverable errors while following the leader  -->
    <Match>
-     <Class name="org.apache.zookeeper.server.quorum.Follower" />
-       <Method name="followLeader" />
+     <Class name="org.apache.zookeeper.server.quorum.Learner" />
+       <Method name="syncWithLeader" />
        <Bug pattern="DM_EXIT" />
    </Match>
 
@@ -91,7 +91,7 @@
      <Bug code="IS"/>
   </Match>
   <Match>
-     <Class name="org.apache.zookeeper.server.quorum.FollowerSessionTracker"/>
+     <Class name="org.apache.zookeeper.server.quorum.LearnerSessionTracker"/>
        <Bug code="UrF"/>
   </Match>
   <Match>



Mime
View raw message