zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fang...@apache.org
Subject [3/3] zookeeper git commit: ZOOKEEPER-3140: Allow Followers to host Observers
Date Sun, 09 Dec 2018 02:17:09 GMT
ZOOKEEPER-3140: Allow Followers to host Observers

Creates a new abstraction, LearnerMaster, to represent the portions of the Leader logic that are used in LearnerHandler. Leader implements LearnerMaster and a new class ObserverMaster implements LearnerMaster. Followers have the option of instantiating a ObserverMaster thread when they assume their role and so support Learner traffic.

A new parameter 'observerMasterPort' is used to control which Follower instances host Observers.

Author: Brian Nixon <nixon@fb.com>

Reviewers: fangmin@apache.org, hanm@apache.org, eolivelli@gmail.com

Closes #628 from enixon/learner-master


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/b2513c11
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/b2513c11
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/b2513c11

Branch: refs/heads/master
Commit: b2513c114931dc377bac5e1d39e2f81c6e8cf17e
Parents: 6ea3c0b
Author: Brian Nixon <nixon@fb.com>
Authored: Sat Dec 8 18:16:58 2018 -0800
Committer: Fangmin Lyu <fangmin@apache.org>
Committed: Sat Dec 8 18:16:58 2018 -0800

----------------------------------------------------------------------
 .../main/resources/markdown/zookeeperAdmin.md   |   7 +
 .../resources/markdown/zookeeperObservers.md    |  32 +
 .../apache/zookeeper/server/ObserverBean.java   |  14 +
 .../apache/zookeeper/server/admin/Commands.java |  15 +
 .../zookeeper/server/quorum/Follower.java       |  46 +-
 .../zookeeper/server/quorum/FollowerBean.java   |  10 +
 .../zookeeper/server/quorum/FollowerMXBean.java |  10 +
 .../server/quorum/FollowerRequestProcessor.java |  41 +-
 .../server/quorum/FollowerZooKeeperServer.java  |  32 +-
 .../apache/zookeeper/server/quorum/Leader.java  | 166 +++-
 .../server/quorum/LeaderZooKeeperServer.java    |  11 +
 .../apache/zookeeper/server/quorum/Learner.java |  28 +-
 .../zookeeper/server/quorum/LearnerHandler.java | 166 ++--
 .../server/quorum/LearnerHandlerBean.java       |  66 ++
 .../server/quorum/LearnerHandlerMXBean.java     |  29 +
 .../zookeeper/server/quorum/LearnerMaster.java  | 196 +++++
 .../zookeeper/server/quorum/Observer.java       |  98 ++-
 .../zookeeper/server/quorum/ObserverMXBean.java |  12 +
 .../zookeeper/server/quorum/ObserverMaster.java | 532 +++++++++++++
 .../zookeeper/server/quorum/QuorumPeer.java     |  90 ++-
 .../server/quorum/QuorumPeerConfig.java         |  12 +
 .../zookeeper/server/quorum/QuorumPeerMain.java |   1 +
 .../java/org/apache/zookeeper/ZKTestCase.java   |  26 +
 .../server/quorum/DelayRequestProcessor.java    |  77 ++
 .../server/quorum/LearnerHandlerTest.java       |  55 +-
 .../zookeeper/server/util/PortForwarder.java    | 105 +--
 .../zookeeper/test/ObserverMasterTest.java      | 780 +++++++++++++++++++
 .../org/apache/zookeeper/test/ObserverTest.java | 163 +---
 .../org/apache/zookeeper/test/ReconfigTest.java |  16 +-
 .../src/test/resources/findbugsExcludeFile.xml  |   8 +
 30 files changed, 2459 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
----------------------------------------------------------------------
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index bb7d792..0e92cf9 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -550,6 +550,13 @@ in the configuration file:
     Note that SSL feature will be enabled when user plugs-in
     zookeeper.serverCnxnFactory, zookeeper.clientCnxnSocket as Netty.
 
+* *observerMasterPort* :
+    the port to listen for observer connections; that is, the
+    port that observers attempt to connect to.
+    if the property is set then the server will host observer connections
+    when in follower mode in addition to when in leader mode and correspondingly
+    attempt to connect to any voting peer when in observer mode.
+
 * *dataDir* :
     the location where ZooKeeper will store the in-memory
     database snapshots and, unless specified otherwise, the

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md
----------------------------------------------------------------------
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md b/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md
index 4642b13..7865723 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md
@@ -81,6 +81,38 @@ specified in every config file. You should see a command line prompt
 through which you can issue commands like _ls_ to query
 the ZooKeeper service.
 
+<a name="ch_ObserverMasters"></a>
+
+## How to use Observer Masters
+
+Observers function simple as non-voting members of the ensemble, sharing
+the Learner interface with Followers and holding only a slightly difference
+internal pipeline. Both maintain connections along the quorum port with the
+Leader by which they learn of all new proposals on the ensemble.
+
+By default, Observers connect to the Leader of the quorum along its
+quorum port and this is how they learn of all new proposals on the
+ensemble. There are benefits to allowing Observers to connect to the
+Followers instead as a means of plugging in to the commit stream in place
+of connecting to the Leader. It shifts the burden of supporting Observers
+off the Leader and allow it to focus on coordinating the commit of writes.
+This means better performance when the Leader is under high load,
+particularly high network load such as can happen after a leader election
+when many Learners need to sync. It reduces the total network connections
+maintained on the Leader when there are a high number of observers.
+Activating Followers to support Observers allow the overall number of
+Observers to scale into the hundreds. One the other end, Observer
+availability is improved since it will take shorter time for a high
+number of Observers to finish syncing and start serving client traffic.
+
+This feature can be activated by letting all members of the ensemble know
+which port will be used by the Followers to listen for Observer
+connections. The following entry, when added to the server config file,
+will instruct Observers to connect to peers (Leaders and Followers) on
+port 2191 and instruct Followers to create an ObserverMaster thread to
+listen and serve on that port.
+
+    observerMasterPort=2191
 <a name="ch_UseCases"></a>
 
 ## Example use cases

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java
index 72d724e..167c96d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server;
 
 import org.apache.zookeeper.server.quorum.Observer;
 import org.apache.zookeeper.server.quorum.ObserverMXBean;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
 
 /**
  * ObserverBean
@@ -46,4 +47,17 @@ public class ObserverBean extends ZooKeeperServerBean implements ObserverMXBean{
         return observer.getSocket().toString();
     }
 
+    public String getLearnerMaster() {
+        QuorumPeer.QuorumServer learnerMaster = observer.getCurrentLearnerMaster();
+        if (learnerMaster == null || learnerMaster.addr == null) {
+            return "Unknown";
+        }
+        return learnerMaster.addr.getAddress().getHostAddress() + ":" + learnerMaster.addr.getPort();
+    }
+
+    public void setLearnerMaster(String learnerMaster) {
+        if (!observer.setLearnerMaster(learnerMaster)) {
+            throw new IllegalArgumentException("Not a valid learner master");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 4cfa772..29e1845 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -37,10 +37,13 @@ import org.apache.zookeeper.server.ServerStats;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.Follower;
+import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
+import org.apache.zookeeper.server.quorum.ObserverZooKeeperServer;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.slf4j.Logger;
@@ -375,6 +378,18 @@ public class Commands {
                 response.put("min_proposal_size", leader.getProposalStats().getMinBufferSize());
             }
 
+            if (zkServer instanceof FollowerZooKeeperServer) {
+                Follower follower = ((FollowerZooKeeperServer) zkServer).getFollower();
+                Integer syncedObservers = follower.getSyncedObserverSize();
+                if (syncedObservers != null) {
+                    response.put("synced_observers", syncedObservers);
+                }
+            }
+
+            if (zkServer instanceof ObserverZooKeeperServer) {
+                response.put("observer_master_id", ((ObserverZooKeeperServer)zkServer).getObserver().getLearnerMasterId());
+            }
+
             response.putAll(ServerMetrics.getAllValues());
 
             return response;

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index 78ae7aa..49280d3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -42,7 +42,9 @@ public class Follower extends Learner{
     private long lastQueued;
     // This is the same object as this.zk, but we cache the downcast op
     final FollowerZooKeeperServer fzk;
-    
+
+    ObserverMaster om;
+
     Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
         this.self = self;
         this.zk=zk;
@@ -96,6 +98,15 @@ public class Follower extends Learner{
                     long syncTime = Time.currentElapsedTime() - startTime;
                     ServerMetrics.FOLLOWER_SYNC_TIME.add(syncTime);
                 }
+                if (self.getObserverMasterPort() > 0) {
+                    LOG.info("Starting ObserverMaster");
+
+                    om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
+                    om.start();
+                } else {
+                    om = null;
+                }
+                // create a reusable packet to reduce gc impact
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning()) {
                     readPacket(qp);
@@ -113,6 +124,9 @@ public class Follower extends Learner{
                 pendingRevalidations.clear();
             }
         } finally {
+            if (om != null) {
+                om.stop();
+            }
             zk.unregisterJMX((Learner)this);
         }
     }
@@ -145,9 +159,16 @@ public class Follower extends Learner{
             }
             
             fzk.logRequest(hdr, txn);
+
+            if (om != null) {
+                om.proposalReceived(qp);
+            }
             break;
         case Leader.COMMIT:
             fzk.commit(qp.getZxid());
+            if (om != null) {
+                om.proposalCommitted(qp.getZxid());
+            }
             break;
             
         case Leader.COMMITANDACTIVATE:
@@ -159,11 +180,16 @@ public class Follower extends Learner{
            // get new designated leader from (current) leader's message
            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
            long suggestedLeaderId = buffer.getLong();
-            boolean majorChange = 
-                   self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
-           // commit (writes the new config to ZK tree (/zookeeper/config)                     
-           fzk.commit(qp.getZxid());
-            if (majorChange) {
+           final long zxid = qp.getZxid();
+           boolean majorChange =
+                   self.processReconfig(qv, suggestedLeaderId, zxid, true);
+           // commit (writes the new config to ZK tree (/zookeeper/config)
+           fzk.commit(zxid);
+
+           if (om != null) {
+               om.informAndActivate(zxid, suggestedLeaderId);
+           }
+           if (majorChange) {
                throw new Exception("changes proposed in reconfig");
            }
            break;
@@ -171,7 +197,9 @@ public class Follower extends Learner{
             LOG.error("Received an UPTODATE message after Follower started");
             break;
         case Leader.REVALIDATE:
-            revalidate(qp);
+            if (om == null || !om.revalidateLearnerSession(qp)) {
+                revalidate(qp);
+            }
             break;
         case Leader.SYNC:
             fzk.sync();
@@ -205,6 +233,10 @@ public class Follower extends Learner{
         return lastQueued;
     }
 
+    public Integer getSyncedObserverSize() {
+        return  om == null ? null : om.getNumActiveObservers();
+    }
+
     @Override
     public void shutdown() {    
         LOG.info("shutdown called", new Exception("shutdown Follower"));

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
index 8773ab8..edfc9c7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
@@ -52,4 +52,14 @@ public class FollowerBean extends ZooKeeperServerBean implements FollowerMXBean
     public long getElectionTimeTaken() {
         return follower.self.getElectionTimeTaken();
     }
+
+    @Override
+    public int getObserverMasterPacketSizeLimit() {
+        return follower.om == null ? -1 : follower.om.getPktsSizeLimit();
+    }
+
+    @Override
+    public void setObserverMasterPacketSizeLimit(int sizeLimit) {
+        ObserverMaster.setPktsSizeLimit(sizeLimit);
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
index 45c7fd8..6b4edd0 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
@@ -43,4 +43,14 @@ public interface FollowerMXBean extends ZooKeeperServerMXBean {
      * @return time taken for leader election in milliseconds.
      */
     public long getElectionTimeTaken();
+
+    /**
+     * @return the size limit in bytes for the observer master commit packet queue
+     */
+    public int getObserverMasterPacketSizeLimit();
+
+    /**
+     * set the size limit in bytes for the observer master commit packet queue
+     */
+    public void setObserverMasterPacketSizeLimit(int sizeLimit);
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
index c623eba..2f345a8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
@@ -111,26 +111,33 @@ public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements
     }
 
     public void processRequest(Request request) {
+        processRequest(request, true);
+    }
+
+    void processRequest(Request request, boolean checkForUpgrade) {
         if (!finished) {
-            // Before sending the request, check if the request requires a
-            // global session and what we have is a local session. If so do
-            // an upgrade.
-            Request upgradeRequest = null;
-            try {
-                upgradeRequest = zks.checkUpgradeSession(request);
-            } catch (KeeperException ke) {
-                if (request.getHdr() != null) {
-                    request.getHdr().setType(OpCode.error);
-                    request.setTxn(new ErrorTxn(ke.code().intValue()));
+            if (checkForUpgrade) {
+                // Before sending the request, check if the request requires a
+                // global session and what we have is a local session. If so do
+                // an upgrade.
+                Request upgradeRequest = null;
+                try {
+                    upgradeRequest = zks.checkUpgradeSession(request);
+                } catch (KeeperException ke) {
+                    if (request.getHdr() != null) {
+                        request.getHdr().setType(OpCode.error);
+                        request.setTxn(new ErrorTxn(ke.code().intValue()));
+                    }
+                    request.setException(ke);
+                    LOG.info("Error creating upgrade request", ke);
+                } catch (IOException ie) {
+                    LOG.error("Unexpected error in upgrade", ie);
+                }
+                if (upgradeRequest != null) {
+                    queuedRequests.add(upgradeRequest);
                 }
-                request.setException(ke);
-                LOG.info("Error creating upgrade request",  ke);
-            } catch (IOException ie) {
-                LOG.error("Unexpected error in upgrade", ie);
-            }
-            if (upgradeRequest != null) {
-                queuedRequests.add(upgradeRequest);
             }
+
             queuedRequests.add(request);
         }
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index 78c12db..8f88bd3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.Record;
+import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.server.ExitCode;
@@ -34,6 +35,8 @@ import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.txn.TxnHeader;
 
+import javax.management.JMException;
+
 /**
  * Just like the standard ZooKeeperServer. We just replace the request
  * processors: FollowerRequestProcessor -> CommitProcessor ->
@@ -113,13 +116,17 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
     }
 
     synchronized public void sync(){
-        if(pendingSyncs.size() ==0){
+        if(pendingSyncs.size() == 0) {
             LOG.warn("Not expecting a sync.");
             return;
         }
 
         Request r = pendingSyncs.remove();
-		commitProcessor.commit(r);
+        if (r instanceof LearnerSyncRequest) {
+            LearnerSyncRequest lsr = (LearnerSyncRequest)r;
+            lsr.fh.queuePacket(new QuorumPacket(Leader.SYNC, 0, null, null));
+        }
+        commitProcessor.commit(r);
     }
 
     @Override
@@ -139,4 +146,25 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
     public Learner getLearner() {
         return getFollower();
     }
+
+    /**
+     * Process a request received from external Learner through the LearnerMaster
+     * These requests have already passed through validation and checks for
+     * session upgrade and can be injected into the middle of the pipeline.
+     *
+     * @param request received from external Learner
+     */
+    void processObserverRequest(Request request) {
+        ((FollowerRequestProcessor)firstProcessor).processRequest(request, false);
+    }
+
+    boolean registerJMX(LearnerHandlerBean handlerBean) {
+        try {
+            MBeanRegistry.getInstance().register(handlerBean, jmxServerBean);
+            return true;
+        } catch (JMException e) {
+            LOG.warn("Could not register connection", e);
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 0a892b1..721a1b4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -19,6 +19,10 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.ServerSocket;
@@ -41,15 +45,20 @@ import java.util.concurrent.ConcurrentMap;
 
 import javax.security.sasl.SaslException;
 
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooKeeperCriticalThread;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
@@ -60,7 +69,7 @@ import org.slf4j.LoggerFactory;
 /**
  * This class has the control logic for the Leader.
  */
-public class Leader {
+public class Leader implements LearnerMaster {
     private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
 
     static final private boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true");
@@ -118,6 +127,9 @@ public class Leader {
                 maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
     }
 
+    // beans for all learners
+    private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap<>();
+
     /**
      * Returns a copy of the current learner snapshot
      */
@@ -181,7 +193,8 @@ public class Leader {
      * @param learner
      *                instance of learner handle
      */
-    void addLearnerHandler(LearnerHandler learner) {
+    @Override
+    public void addLearnerHandler(LearnerHandler learner) {
         synchronized (learners) {
             learners.add(learner);
         }
@@ -192,7 +205,8 @@ public class Leader {
      *
      * @param peer
      */
-    void removeLearnerHandler(LearnerHandler peer) {
+    @Override
+    public void removeLearnerHandler(LearnerHandler peer) {
         synchronized (forwardingFollowers) {
             forwardingFollowers.remove(peer);
         }
@@ -866,6 +880,7 @@ public class Leader {
      * @param sid, the id of the server that sent the ack
      * @param followerAddr
      */
+    @Override
     synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
         if (!allowedToCommit) return; // last op committed was a leader change - from now on
                                      // the new leader should commit
@@ -1064,23 +1079,30 @@ public class Leader {
         sendObserverPacket(qp);
     }
 
+    public static QuorumPacket buildInformAndActivePacket(long zxid,
+            long designatedLeader, byte[] proposalData) {
+        byte[] data = new byte[proposalData.length + 8];
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        buffer.putLong(designatedLeader);
+        buffer.put(proposalData);
+
+        return new QuorumPacket(Leader.INFORMANDACTIVATE, zxid, data, null);
+    }
 
     /**
      * Create an inform&activate packet and send it to all observers.
      */
     public void informAndActivate(Proposal proposal, long designatedLeader) {
-       byte[] proposalData = proposal.packet.getData();
-        byte[] data = new byte[proposalData.length + 8];
-        ByteBuffer buffer = ByteBuffer.wrap(data);
-       buffer.putLong(designatedLeader);
-       buffer.put(proposalData);
-
-        QuorumPacket qp = new QuorumPacket(Leader.INFORMANDACTIVATE, proposal.request.zxid, data, null);
-        sendObserverPacket(qp);
+        sendObserverPacket(buildInformAndActivePacket(proposal.request.zxid,
+                designatedLeader, proposal.packet.getData()));
     }
 
     long lastProposed;
 
+    @Override
+    synchronized public long getLastProposed() {
+        return lastProposed;
+    }
 
     /**
      * Returns the current epoch of the leader.
@@ -1146,6 +1168,7 @@ public class Leader {
         return p;
     }
 
+    @Override
     public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
         return learnerSnapshotThrottler;
     }
@@ -1185,6 +1208,7 @@ public class Leader {
      * @return last proposed zxid
      * @throws InterruptedException
      */
+    @Override
     synchronized public long startForwarding(LearnerHandler handler,
             long lastSeenZxid) {
         // Queue up any outstanding requests enabling the receipt of
@@ -1221,6 +1245,16 @@ public class Leader {
 
         return lastProposed;
     }
+
+    @Override
+    public void waitForStartup() throws InterruptedException {
+        synchronized(zk) {
+            while(!zk.isRunning() && !Thread.currentThread().isInterrupted()) {
+                zk.wait(20);
+            }
+        }
+    }
+
     // VisibleForTesting
     protected final Set<Long> connectingFollowers = new HashSet<Long>();
 
@@ -1277,6 +1311,7 @@ public class Leader {
         }
     }
 
+    @Override
     public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
         synchronized(connectingFollowers) {
             if (!waitingForNewEpoch) {
@@ -1313,10 +1348,17 @@ public class Leader {
         }
     }
 
+    @Override
+    public ZKDatabase getZKDatabase() {
+        return zk.getZKDatabase();
+    }
+
     // VisibleForTesting
     protected final Set<Long> electingFollowers = new HashSet<Long>();
     // VisibleForTesting
     protected boolean electionFinished = false;
+
+    @Override
     public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
         synchronized(electingFollowers) {
             if (electionFinished) {
@@ -1417,6 +1459,7 @@ public class Leader {
      * @param sid
      * @throws InterruptedException
      */
+    @Override
     public void waitForNewLeaderAck(long sid, long zxid)
             throws InterruptedException {
 
@@ -1517,4 +1560,105 @@ public class Leader {
     private boolean isParticipant(long sid) {
         return self.getQuorumVerifier().getVotingMembers().containsKey(sid);
     }
+
+    @Override
+    public int getCurrentTick() {
+        return self.tick.get();
+    }
+
+    @Override
+    public int syncTimeout() {
+        return self.tickTime * self.syncLimit;
+    }
+
+    @Override
+    public int getTickOfNextAckDeadline() {
+        return self.tick.get() + self.syncLimit;
+    }
+
+    @Override
+    public int getTickOfInitialAckDeadline() {
+        return self.tick.get() + self.initLimit + self.syncLimit;
+    }
+
+    @Override
+    public long getAndDecrementFollowerCounter() {
+        return followerCounter.getAndDecrement();
+    }
+
+    @Override
+    public void touch(long sess, int to) {
+        zk.touch(sess, to);
+    }
+
+    @Override
+    public void submitLearnerRequest(Request si) {
+        zk.submitLearnerRequest(si);
+    }
+
+    @Override
+    public long getQuorumVerifierVersion() {
+        return self.getQuorumVerifier().getVersion();
+    }
+
+    @Override
+    public String getPeerInfo(long sid) {
+        QuorumPeer.QuorumServer server = self.getView().get(sid);
+        return server == null ? "" : server.toString();
+    }
+
+    @Override
+    public byte[] getQuorumVerifierBytes() {
+        return self.getLastSeenQuorumVerifier().toString().getBytes();
+    }
+
+    @Override
+    public QuorumAuthServer getQuorumAuthServer() {
+        return (self == null) ? null : self.authServer;
+    }
+
+    @Override
+    public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+        DataInputStream dis = new DataInputStream(bis);
+        long id = dis.readLong();
+        int to = dis.readInt();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        dos.writeLong(id);
+        boolean valid = zk.checkIfValidGlobalSession(id, to);
+        if (valid) {
+            try {
+                // set the session owner as the follower that owns the session
+                zk.setOwner(id, learnerHandler);
+            } catch (KeeperException.SessionExpiredException e) {
+                LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
+            }
+        }
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logTraceMessage(LOG,
+                    ZooTrace.SESSION_TRACE_MASK,
+                    "Session 0x" + Long.toHexString(id)
+                            + " is valid: "+ valid);
+        }
+        dos.writeBoolean(valid);
+        qp.setData(bos.toByteArray());
+        learnerHandler.queuePacket(qp);
+    }
+
+    @Override
+    public void registerLearnerHandlerBean(final LearnerHandler learnerHandler, Socket socket) {
+        LearnerHandlerBean bean = new LearnerHandlerBean(learnerHandler, socket);
+        if (zk.registerJMX(bean)) {
+            connectionBeans.put(learnerHandler, bean);
+        }
+    }
+
+    @Override
+    public void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler) {
+        LearnerHandlerBean bean = connectionBeans.remove(learnerHandler);
+        if (bean != null){
+            MBeanRegistry.getInstance().unregister(bean);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
index e1d1bb6..6484e30 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -30,6 +30,7 @@ import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
+import javax.management.JMException;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
@@ -186,6 +187,16 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
         }
     }
 
+    boolean registerJMX(LearnerHandlerBean handlerBean) {
+        try {
+            MBeanRegistry.getInstance().register(handlerBean, jmxServerBean);
+            return true;
+        } catch (JMException e) {
+            LOG.warn("Could not register connection", e);
+        }
+        return false;
+    }
+
     @Override
     protected void unregisterJMX() {
         // unregister from JMX

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index faaa844..307b644 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -87,9 +87,17 @@ public class Learner {
     
     protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
 
+    /**
+     * Time to wait after connection attempt with the Leader or LearnerMaster before this
+     * Learner tries to connect again.
+     */
+    private static final int leaderConnectDelayDuringRetryMs =
+            Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100);
+
     static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
     static {
-        LOG.info("TCP NoDelay set to: " + nodelay);
+        LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
+        LOG.info("TCP NoDelay set to: {}", nodelay);
     }   
     
     final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations =
@@ -235,9 +243,10 @@ public class Learner {
     }
 
     /**
-     * Establish a connection with the Leader found by findLeader. Retries
-     * until either initLimit time has elapsed or 5 tries have happened. 
-     * @param addr - the address of the Leader to connect to.
+     * Establish a connection with the LearnerMaster found by findLearnerMaster.
+     * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
+     * Retries until either initLimit time has elapsed or 5 tries have happened.
+     * @param addr - the address of the Peer to connect to.
      * @throws IOException - if the socket connection fails on the 5th attempt
      * <li>if there is an authentication failure while connecting to leader</li>
      * @throws ConnectException
@@ -248,7 +257,7 @@ public class Learner {
         this.sock = createSocket();
 
         int initLimitTime = self.tickTime * self.initLimit;
-        int remainingInitLimitTime = initLimitTime;
+        int remainingInitLimitTime;
         long startNanoTime = nanoTime();
 
         for (int tries = 0; tries < 5; tries++) {
@@ -286,7 +295,7 @@ public class Learner {
                     this.sock = createSocket();
                 }
             }
-            Thread.sleep(1000);
+            Thread.sleep(leaderConnectDelayDuringRetryMs);
         }
 
         self.authLearner.authenticate(sock, hostname);
@@ -309,8 +318,8 @@ public class Learner {
     }
 
     /**
-     * Once connected to the leader, perform the handshake protocol to
-     * establish a following / observing connection. 
+     * Once connected to the leader or learner master, perform the handshake
+     * protocol to establish a following / observing connection.
      * @param pktType
      * @return the zxid the Leader sends for synchronization purposes.
      * @throws IOException
@@ -369,7 +378,8 @@ public class Learner {
     } 
     
     /**
-     * Finally, synchronize our history with the Leader. 
+     * Finally, synchronize our history with the Leader (if Follower)
+     * or the LearnerMaster (if Observer).
      * @param newLeaderZxid
      * @throws IOException
      * @throws InterruptedException

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index bc84916..78429e0 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -21,9 +21,7 @@ package org.apache.zookeeper.server.quorum;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.nio.ByteBuffer;
@@ -37,8 +35,6 @@ import javax.security.sasl.SaslException;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerMetrics;
@@ -48,6 +44,7 @@ import org.apache.zookeeper.server.ZooKeeperThread;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
@@ -68,7 +65,7 @@ public class LearnerHandler extends ZooKeeperThread {
         return sock;
     }
 
-    final Leader leader;
+    final LearnerMaster learnerMaster;
 
     /** Deadline for receiving the next ack. If we are bootstrapping then
      * it's based on the initLimit, if we are done bootstrapping it's based
@@ -85,6 +82,10 @@ public class LearnerHandler extends ZooKeeperThread {
         return sid;
     }
 
+    String getRemoteAddress() {
+        return sock == null ? "<null>" : sock.getRemoteSocketAddress().toString();
+    }
+
     protected int version = 0x1;
 
     int getVersion() {
@@ -147,7 +148,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 return true;
             } else {
                 long msDelay = (time - currentTime) / 1000000;
-                return (msDelay < (leader.self.tickTime * leader.self.syncLimit));
+                return (msDelay < learnerMaster.syncTimeout());
             }
         }
     };
@@ -167,7 +168,7 @@ public class LearnerHandler extends ZooKeeperThread {
     private volatile boolean sendingThreadStarted = false;
 
     /**
-     * For testing purpose, force leader to use snapshot to sync with followers
+     * For testing purpose, force learnerMaster to use snapshot to sync with followers
      */
     public static final String FORCE_SNAP_SYNC = "zookeeper.forceSnapshotSync";
     private boolean forceSnapSync = false;
@@ -183,10 +184,10 @@ public class LearnerHandler extends ZooKeeperThread {
      */
     private long leaderLastZxid;
 
-    LearnerHandler(Socket sock, BufferedInputStream bufferedInput,Leader leader) throws IOException {
+    LearnerHandler(Socket sock, BufferedInputStream bufferedInput, LearnerMaster learnerMaster) throws IOException {
         super("LearnerHandler-" + sock.getRemoteSocketAddress());
         this.sock = sock;
-        this.leader = leader;
+        this.learnerMaster = learnerMaster;
         this.bufferedInput = bufferedInput;
 
         if (Boolean.getBoolean(FORCE_SNAP_SYNC)) {
@@ -195,9 +196,9 @@ public class LearnerHandler extends ZooKeeperThread {
         }
 
         try {
-            if (leader.self != null) {
-                leader.self.authServer.authenticate(sock,
-                        new DataInputStream(bufferedInput));
+            QuorumAuthServer authServer = learnerMaster.getQuorumAuthServer();
+            if (authServer != null) {
+                authServer.authenticate(sock, new DataInputStream(bufferedInput));
             }
         } catch (IOException e) {
             LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection",
@@ -368,9 +369,8 @@ public class LearnerHandler extends ZooKeeperThread {
     @Override
     public void run() {
         try {
-            leader.addLearnerHandler(this);
-            tickOfNextAckDeadline = leader.self.tick.get()
-                    + leader.self.initLimit + leader.self.syncLimit;
+            learnerMaster.addLearnerHandler(this);
+            tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();
 
             ia = BinaryInputArchive.getArchive(bufferedInput);
             bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
@@ -384,6 +384,9 @@ public class LearnerHandler extends ZooKeeperThread {
                 return;
             }
 
+            if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
+                throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
+            }
             byte learnerInfoData[] = qp.getData();
             if (learnerInfoData != null) {
                 ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
@@ -395,31 +398,34 @@ public class LearnerHandler extends ZooKeeperThread {
                 }
                 if (learnerInfoData.length >= 20) {
                     long configVersion = bbsid.getLong();
-                    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
+                    if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
                         throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                     }
                 }
             } else {
-                this.sid = leader.followerCounter.getAndDecrement();
+                this.sid = learnerMaster.getAndDecrementFollowerCounter();
             }
 
-            if (leader.self.getView().containsKey(this.sid)) {
-                LOG.info("Follower sid: " + this.sid + " : info : "
-                        + leader.self.getView().get(this.sid).toString());
+            String followerInfo = learnerMaster.getPeerInfo(this.sid);
+            if (followerInfo.isEmpty()) {
+                LOG.info("Follower sid: " + this.sid + " not in the current config "
+                        + Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
             } else {
-                LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
+                LOG.info("Follower sid: " + this.sid + " : info : " + followerInfo);
             }
 
             if (qp.getType() == Leader.OBSERVERINFO) {
                   learnerType = LearnerType.OBSERVER;
             }
 
+            learnerMaster.registerLearnerHandlerBean(this, sock);
+
             long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
 
             long peerLastZxid;
             StateSummary ss = null;
             long zxid = qp.getZxid();
-            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
+            long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
             long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
 
             if (this.getVersion() < 0x10000) {
@@ -427,7 +433,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 long epoch = ZxidUtils.getEpochFromZxid(zxid);
                 ss = new StateSummary(epoch, zxid);
                 // fake the message
-                leader.waitForEpochAck(this.getSid(), ss);
+                learnerMaster.waitForEpochAck(this.getSid(), ss);
             } else {
                 byte ver[] = new byte[4];
                 ByteBuffer.wrap(ver).putInt(0x10000);
@@ -443,21 +449,21 @@ public class LearnerHandler extends ZooKeeperThread {
 				}
                 ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                 ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
-                leader.waitForEpochAck(this.getSid(), ss);
+                learnerMaster.waitForEpochAck(this.getSid(), ss);
             }
             peerLastZxid = ss.getLastZxid();
 
             // Take any necessary action if we need to send TRUNC or DIFF
             // startForwarding() will be called in all cases
-            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
+            boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
 
             /* if we are not truncating or sending a diff just send a snapshot */
             if (needSnap) {
                 boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                 LearnerSnapshot snapshot =
-                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
+                        learnerMaster.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                 try {
-                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
+                    long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                     oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                     bufferedOutput.flush();
 
@@ -470,7 +476,7 @@ public class LearnerHandler extends ZooKeeperThread {
                             snapshot.getConcurrentSnapshotNumber(),
                             snapshot.isEssential() ? "exempt" : "not exempt");
                     // Dump data to peer
-                    leader.zk.getZKDatabase().serializeSnapshot(oa);
+                    learnerMaster.getZKDatabase().serializeSnapshot(oa);
                     oa.writeString("BenWasHere", "signature");
                     bufferedOutput.flush();
                 } finally {
@@ -492,8 +498,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 oa.writeRecord(newLeaderQP, "packet");
             } else {
                 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
-                                .toString().getBytes(), null);
+                        newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
                 queuedPackets.add(newLeaderQP);
             }
             bufferedOutput.flush();
@@ -503,7 +508,7 @@ public class LearnerHandler extends ZooKeeperThread {
 
             /*
              * Have to wait for the first ACK, wait until
-             * the leader is ready, and only then we can
+             * the learnerMaster is ready, and only then we can
              * start processing messages.
              */
             qp = new QuorumPacket();
@@ -517,21 +522,18 @@ public class LearnerHandler extends ZooKeeperThread {
             if(LOG.isDebugEnabled()){
             	LOG.debug("Received NEWLEADER-ACK message from " + sid);
             }
-            leader.waitForNewLeaderAck(getSid(), qp.getZxid());
+            learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
 
             syncLimitCheck.start();
 
             // now that the ack has been processed expect the syncLimit
-            sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
+            sock.setSoTimeout(learnerMaster.syncTimeout());
 
             /*
-             * Wait until leader starts up
+             * Wait until learnerMaster starts up
              */
-            synchronized(leader.zk){
-                while(!leader.zk.isRunning() && !this.isInterrupted()){
-                    leader.zk.wait(20);
-                }
-            }
+            learnerMaster.waitForStartup();
+
             // Mutation packets will be queued during the serialize,
             // so we need to mark when the peer can actually start
             // using the data
@@ -550,7 +552,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 if (LOG.isTraceEnabled()) {
                     ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                 }
-                tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
+                tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();
 
 
                 ByteBuffer bb;
@@ -566,7 +568,7 @@ public class LearnerHandler extends ZooKeeperThread {
                         }
                     }
                     syncLimitCheck.updateAck(qp.getZxid());
-                    leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
+                    learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                     break;
                 case Leader.PING:
                     // Process the touches
@@ -576,38 +578,11 @@ public class LearnerHandler extends ZooKeeperThread {
                     while (dis.available() > 0) {
                         long sess = dis.readLong();
                         int to = dis.readInt();
-                        leader.zk.touch(sess, to);
+                        learnerMaster.touch(sess, to);
                     }
                     break;
                 case Leader.REVALIDATE:
-                    bis = new ByteArrayInputStream(qp.getData());
-                    dis = new DataInputStream(bis);
-                    long id = dis.readLong();
-                    int to = dis.readInt();
-                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                    DataOutputStream dos = new DataOutputStream(bos);
-                    dos.writeLong(id);
-                    boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
-                    if (valid) {
-                        try {
-                            //set the session owner
-                            // as the follower that
-                            // owns the session
-                            leader.zk.setOwner(id, this);
-                        } catch (SessionExpiredException e) {
-                            LOG.error("Somehow session " + Long.toHexString(id) +
-                                    " expired right after being renewed! (impossible)", e);
-                        }
-                    }
-                    if (LOG.isTraceEnabled()) {
-                        ZooTrace.logTraceMessage(LOG,
-                                                 ZooTrace.SESSION_TRACE_MASK,
-                                                 "Session 0x" + Long.toHexString(id)
-                                                 + " is valid: "+ valid);
-                    }
-                    dos.writeBoolean(valid);
-                    qp.setData(bos.toByteArray());
-                    queuedPackets.add(qp);
+                    learnerMaster.revalidateSession(qp, this);
                     break;
                 case Leader.REQUEST:
                     bb = ByteBuffer.wrap(qp.getData());
@@ -622,7 +597,7 @@ public class LearnerHandler extends ZooKeeperThread {
                         si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                     }
                     si.setOwner(this);
-                    leader.zk.submitLearnerRequest(si);
+                    learnerMaster.submitLearnerRequest(si);
                     break;
                 default:
                     LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
@@ -646,9 +621,7 @@ public class LearnerHandler extends ZooKeeperThread {
         } catch (SnapshotThrottleException e) {
             LOG.error("too many concurrent snapshots: " + e);
         } finally {
-            LOG.warn("******* GOODBYE "
-                    + (sock != null ? sock.getRemoteSocketAddress() : "<null>")
-                    + " ********");
+            LOG.warn("******* GOODBYE {} ********", getRemoteAddress());
             shutdown();
         }
     }
@@ -681,19 +654,18 @@ public class LearnerHandler extends ZooKeeperThread {
      * and setup follower to receive packets from commit processor
      *
      * @param peerLastZxid
-     * @param db
-     * @param leader
+     * @param learnerMaster
      * @return true if snapshot transfer is needed.
      */
-    public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
+    boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
         /*
          * When leader election is completed, the leader will set its
          * lastProcessedZxid to be (epoch < 32). There will be no txn associated
          * with this zxid.
          *
          * The learner will set its lastProcessedZxid to the same value if
-         * it get DIFF or SNAP from the leader. If the same learner come
-         * back to sync with leader using this zxid, we will never find this
+         * it get DIFF or SNAP from the learnerMaster. If the same learner come
+         * back to sync with learnerMaster using this zxid, we will never find this
          * zxid in our history. In this case, we will ignore TRUNC logic and
          * always send DIFF if we have old enough history
          */
@@ -701,6 +673,7 @@ public class LearnerHandler extends ZooKeeperThread {
         // Keep track of the latest zxid which already queued
         long currentZxid = peerLastZxid;
         boolean needSnap = true;
+        ZKDatabase db = learnerMaster.getZKDatabase();
         boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
         ReentrantReadWriteLock lock = db.getLogLock();
         ReadLock rl = lock.readLock();
@@ -721,7 +694,7 @@ public class LearnerHandler extends ZooKeeperThread {
             if (db.getCommittedLog().isEmpty()) {
                 /*
                  * It is possible that committedLog is empty. In that case
-                 * setting these value to the latest txn in leader db
+                 * setting these value to the latest txn in learnerMaster db
                  * will reduce the case that we need to handle
                  *
                  * Here is how each case handle by the if block below
@@ -737,7 +710,7 @@ public class LearnerHandler extends ZooKeeperThread {
              * Here are the cases that we want to handle
              *
              * 1. Force sending snapshot (for testing purpose)
-             * 2. Peer and leader is already sync, send empty diff
+             * 2. Peer and learnerMaster is already sync, send empty diff
              * 3. Follower has txn that we haven't seen. This may be old leader
              *    so we need to send TRUNC. However, if peer has newEpochZxid,
              *    we cannot send TRUNC since the follower has no txnlog
@@ -750,7 +723,7 @@ public class LearnerHandler extends ZooKeeperThread {
              */
 
             if (forceSnapSync) {
-                // Force leader to use snapshot to sync with follower
+                // Force learnerMaster to use snapshot to sync with follower
                 LOG.warn("Forcing snapshot sync - should not see this in production");
             } else if (lastProcessedZxid == peerLastZxid) {
                 // Follower is already sync with us, send empty diff
@@ -811,9 +784,12 @@ public class LearnerHandler extends ZooKeeperThread {
                         Long.toHexString(peerLastZxid),
                         txnLogSyncEnabled);
             }
+            if (needSnap) {
+                currentZxid = db.getDataTreeLastProcessedZxid();
+            }
             LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
                       " for peer sid: " +  getSid());
-            leaderLastZxid = leader.startForwarding(this, currentZxid);
+            leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
         } finally {
             rl.unlock();
         }
@@ -884,7 +860,7 @@ public class LearnerHandler extends ZooKeeperThread {
                    queueOpPacket(Leader.DIFF, lastCommittedZxid);
                    needOpPacket = false;
                 } else if (packetZxid > peerLastZxid  ) {
-                    // Peer have some proposals that the leader hasn't seen yet
+                    // Peer have some proposals that the learnerMaster hasn't seen yet
                     // it may used to be a leader
                     if (ZxidUtils.getEpochFromZxid(packetZxid) !=
                             ZxidUtils.getEpochFromZxid(peerLastZxid)) {
@@ -947,7 +923,8 @@ public class LearnerHandler extends ZooKeeperThread {
             LOG.warn("Ignoring unexpected exception during socket close", e);
         }
         this.interrupt();
-        leader.removeLearnerHandler(this);
+        learnerMaster.removeLearnerHandler(this);
+        learnerMaster.unregisterLearnerHandlerBean(this);
     }
 
     public long tickOfNextAckDeadline() {
@@ -955,7 +932,7 @@ public class LearnerHandler extends ZooKeeperThread {
     }
 
     /**
-     * ping calls from the leader to the peers
+     * ping calls from the learnerMaster to the peers
      */
     public void ping() {
         // If learner hasn't sync properly yet, don't send ping packet
@@ -965,9 +942,7 @@ public class LearnerHandler extends ZooKeeperThread {
         }
         long id;
         if (syncLimitCheck.check(System.nanoTime())) {
-            synchronized(leader) {
-                id = leader.lastProposed;
-            }
+            id = learnerMaster.getLastProposed();
             QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
             queuePacket(ping);
         } else {
@@ -990,9 +965,18 @@ public class LearnerHandler extends ZooKeeperThread {
         queuedPackets.add(p);
     }
 
+    static long packetSize(QuorumPacket p) {
+        /* Approximate base size of QuorumPacket: int + long + byte[] + List */
+        long size = 4 + 8 + 8 + 8;
+        byte[] data = p.getData();
+        if (data != null) {
+            size += data.length;
+        }
+        return size;
+    }
+
     public boolean synced() {
-        return isAlive()
-        && leader.self.tick.get() <= tickOfNextAckDeadline;
+        return isAlive() && learnerMaster.getCurrentTick() <= tickOfNextAckDeadline;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerBean.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerBean.java
new file mode 100644
index 0000000..a97a880
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerBean.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+public class LearnerHandlerBean implements LearnerHandlerMXBean, ZKMBeanInfo{
+    private static final Logger LOG = LoggerFactory.getLogger(LearnerHandlerBean.class);
+
+    private final LearnerHandler learnerHandler;
+    private final String remoteAddr;
+
+    public LearnerHandlerBean(final LearnerHandler learnerHandler, final Socket socket) {
+        this.learnerHandler = learnerHandler;
+        InetSocketAddress sockAddr = (InetSocketAddress) socket.getRemoteSocketAddress();
+        if (sockAddr == null) {
+            this.remoteAddr = "Unknown";
+        } else {
+            this.remoteAddr = sockAddr.getAddress().getHostAddress() + ":" + sockAddr.getPort();
+        }
+    }
+
+    @Override
+    public String getName() {
+        return MBeanRegistry.getInstance().makeFullPath("Learner_Connections", ObjectName.quote(remoteAddr),
+                String.format("\"id:%d\"", learnerHandler.getSid()));
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public void terminateConnection() {
+        LOG.info("terminating learner handler connection on demand " + toString());
+        learnerHandler.shutdown();
+    }
+
+    @Override
+    public String toString() {
+        return "LearnerHandlerBean{remoteIP=" + remoteAddr + ",ServerId=" + learnerHandler.getSid() + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerMXBean.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerMXBean.java
new file mode 100644
index 0000000..3d85a53
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerMXBean.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+/**
+ * This MBean represents a server connection for a learner.
+ */
+public interface LearnerHandlerMXBean {
+    /**
+     * Terminate the connection. The learner will attempt to reconnect to
+     * the leader or to the next ObserverMaster if that feature is enabled
+     */
+    public void terminateConnection();
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java
new file mode 100644
index 0000000..3bffacf
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+/**
+ * interface for keeping Observers in sync
+ */
+public interface LearnerMaster {
+    /**
+     * start tracking a learner handler
+     * @param learnerHandler to track
+     */
+    void addLearnerHandler(LearnerHandler learnerHandler);
+
+    /**
+     * stop tracking a learner handler
+     * @param learnerHandler to drop
+     */
+    void removeLearnerHandler(LearnerHandler learnerHandler);
+
+    /**
+     * wait for the leader of the new epoch to be confirmed by followers
+     * @param sid learner id
+     * @param ss
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException;
+
+    /**
+     * snapshot throttler
+     * @return snapshot throttler
+     */
+    LearnerSnapshotThrottler getLearnerSnapshotThrottler();
+
+    /**
+     * wait for server to start
+     * @throws InterruptedException
+     */
+    void waitForStartup() throws InterruptedException;
+
+    /**
+     * get the first zxid of the next epoch
+     * @param sid learner id
+     * @param lastAcceptedEpoch
+     * @return
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException;
+
+    /**
+     * ZKDatabase
+     * @return ZKDatabase
+     */
+    ZKDatabase getZKDatabase();
+
+    /**
+     * wait for new leader to settle
+     * @param sid id of learner
+     * @param zxid zxid at learner
+     * @throws InterruptedException
+     */
+    void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException;
+
+    /**
+     * last proposed zxid
+     * @return last proposed zxid
+     */
+    long getLastProposed();
+
+    /**
+     * the current tick
+     * @return the current tick
+     */
+    int getCurrentTick();
+
+    /**
+     * time allowed for sync response
+     * @return time allowed for sync response
+     */
+    int syncTimeout();
+
+    /**
+     * deadline tick marking observer sync (initial)
+     * @return deadline tick marking observer sync (initial)
+     */
+    int getTickOfNextAckDeadline();
+
+    /**
+     * next deadline tick marking observer sync (steady state)
+     * @return next deadline tick marking observer sync (steady state)
+     */
+    int getTickOfInitialAckDeadline();
+
+    /**
+     * decrement follower count
+     * @return previous follower count
+     */
+    long getAndDecrementFollowerCounter();
+
+    /**
+     * handle ack packet
+     * @param sid leader id
+     * @param zxid packet zxid
+     * @param localSocketAddress forwarder's address
+     */
+    void processAck(long sid, long zxid, SocketAddress localSocketAddress);
+
+    /**
+     * mark session as alive
+     * @param sess session id
+     * @param to timeout
+     */
+    void touch(long sess, int to);
+
+    /**
+     * handle revalidate packet
+     * @param qp session packet
+     * @param learnerHandler learner
+     * @throws IOException
+     */
+    void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException;
+
+    /**
+     * proxy request from learner to server
+     * @param si request
+     */
+    void submitLearnerRequest(Request si);
+
+    /**
+     * begin forwarding packets to learner handler
+     * @param learnerHandler learner
+     * @param lastSeenZxid zxid of learner
+     * @return last zxid forwarded
+     */
+    long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid);
+
+    /**
+     * version of current quorum verifier
+     * @return version of current quorum verifier
+     */
+    long getQuorumVerifierVersion();
+
+    /**
+     *
+     * @param sid server id
+     * @return server information in the view
+     */
+    String getPeerInfo(long sid);
+
+    /**
+     * identifier of current quorum verifier for new leader
+     * @return identifier of current quorum verifier for new leader
+     */
+    byte[] getQuorumVerifierBytes();
+
+    QuorumAuthServer getQuorumAuthServer();
+
+    /**
+     * registers the handler's bean
+     * @param learnerHandler handler
+     * @param socket connection to learner
+     */
+    void registerLearnerHandlerBean(final LearnerHandler learnerHandler, Socket socket);
+
+    /**
+     * unregisters the handler's bean
+     * @param learnerHandler handler
+     */
+    void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler);
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index f0f724e..b688e03 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.ObserverBean;
@@ -29,6 +30,9 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * Observers are peers that do not take part in the atomic broadcast protocol.
@@ -41,6 +45,31 @@ import org.apache.zookeeper.txn.TxnHeader;
  */
 public class Observer extends Learner{
 
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    /**
+     * When observer lost its connection with the leader, it waits for 0 to the
+     * specified value before trying to reconnect with the leader. So that
+     * the entire observer fleet won't try to run leader election and reconnect
+     * to the leader at once. Default value is zero.
+     */
+    public static final String OBSERVER_RECONNECT_DELAY_MS =
+            "zookeeper.observer.reconnectDelayMs";
+
+    private static final long reconnectDelayMs;
+
+    static {
+        reconnectDelayMs = Long.getLong(OBSERVER_RECONNECT_DELAY_MS, 0);
+        LOG.info(OBSERVER_RECONNECT_DELAY_MS + " = " + reconnectDelayMs);
+    }
+
+    /**
+     * next learner master to try, when specified
+     */
+    private final static AtomicReference<QuorumPeer.QuorumServer> nextLearnerMaster = new AtomicReference<>();
+
+    private QuorumPeer.QuorumServer currentLearnerMaster = null;
+
     Observer(QuorumPeer self,ObserverZooKeeperServer observerZooKeeperServer) {
         this.self = self;
         this.zk=observerZooKeeperServer;
@@ -63,17 +92,16 @@ public class Observer extends Learner{
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
 
         try {
-            QuorumServer leaderServer = findLeader();
-            LOG.info("Observing " + leaderServer.addr);
+            QuorumServer master = findLearnerMaster();
             try {
-                connectToLeader(leaderServer.addr, leaderServer.hostname);
+                connectToLeader(master.addr, master.hostname);
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                 if (self.isReconfigStateChange())
                    throw new Exception("learned about role change");
  
                 syncWithLeader(newLeaderZxid);
                 QuorumPacket qp = new QuorumPacket();
-                while (this.isRunning()) {
+                while (this.isRunning() && nextLearnerMaster.get() == null) {
                     readPacket(qp);
                     processPacket(qp);
                 }
@@ -89,10 +117,29 @@ public class Observer extends Learner{
                 pendingRevalidations.clear();
             }
         } finally {
+            currentLearnerMaster = null;
             zk.unregisterJMX(this);
         }
     }
 
+    private QuorumServer findLearnerMaster() {
+        QuorumPeer.QuorumServer prescribedLearnerMaster =  nextLearnerMaster.getAndSet(null);
+        if (prescribedLearnerMaster != null && self.validateLearnerMaster(Long.toString(prescribedLearnerMaster.id)) == null) {
+            LOG.warn("requested next learner master {} is no longer valid", prescribedLearnerMaster);
+            prescribedLearnerMaster = null;
+        }
+        final QuorumPeer.QuorumServer master = (prescribedLearnerMaster == null) ?
+                self.findLearnerMaster(findLeader()) :
+                prescribedLearnerMaster;
+        currentLearnerMaster = master;
+        if (master == null) {
+            LOG.warn("No learner master found");
+        } else {
+            LOG.info("Observing new leader sid={} addr={}", master.id, master.addr);
+        }
+        return master;
+    }
+
     /**
      * Controls the response of an observer to the receipt of a quorumpacket
      * @param qp
@@ -162,5 +209,48 @@ public class Observer extends Learner{
         LOG.info("shutdown called", new Exception("shutdown Observer"));
         super.shutdown();
     }
+
+    static void waitForReconnectDelay() {
+        if (reconnectDelayMs > 0) {
+            long randomDelay = (long) (reconnectDelayMs * Math.random());
+            LOG.info("Waiting for " + randomDelay
+                    + " ms before reconnecting with the leader");
+            try {
+                Thread.sleep(randomDelay);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted while waiting" + e.getMessage());
+            }
+        }
+    }
+
+    public long getLearnerMasterId() {
+        QuorumPeer.QuorumServer current = currentLearnerMaster;
+        return current == null ? -1 : current.id;
+    }
+
+    /**
+     * Prompts the Observer to disconnect from its current learner master and reconnect
+     * to the specified server. If that connection attempt fails, the Observer will
+     * fail over to the next available learner master.
+     */
+    public boolean setLearnerMaster(String learnerMaster) {
+        final QuorumPeer.QuorumServer server = self.validateLearnerMaster(learnerMaster);
+        if (server == null) {
+            return false;
+        } else if (server.equals(currentLearnerMaster)) {
+            LOG.info("Already connected to requested learner master sid={} addr={}",
+                    server.id, server.addr);
+            return true;
+        } else {
+            LOG.info("Requesting disconnect and reconnect to new learner master sid={} addr={}",
+                    server.id, server.addr);
+            nextLearnerMaster.set(server);
+            return true;
+        }
+    }
+
+    public QuorumPeer.QuorumServer getCurrentLearnerMaster() {
+        return currentLearnerMaster;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java
index 2c1799a..5145e72 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java
@@ -34,4 +34,16 @@ public interface ObserverMXBean extends ZooKeeperServerMXBean {
      * @return socket address
      */
     public String getQuorumAddress();
+
+    /**
+     * @return address of the current learner master
+     */
+    public String getLearnerMaster();
+
+    /**
+     * requests the Observer switch to a new learner master
+     *
+     * @param learnerMaster address of the desired learner master
+     */
+    public void setLearnerMaster(String learnerMaster);
 }


Mime
View raw message