zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [zookeeper] branch master updated: ZOOKEEPER-3398: Learner.connectToLeader() may take too long to time-out
Date Fri, 12 Jul 2019 15:01:51 GMT
This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 43ce772  ZOOKEEPER-3398: Learner.connectToLeader() may take too long to time-out
43ce772 is described below

commit 43ce772db000721546fcd13dd8523002dfa97741
Author: Vladimir Ivic <vladimir.ivic@me.com>
AuthorDate: Fri Jul 12 17:01:44 2019 +0200

    ZOOKEEPER-3398: Learner.connectToLeader() may take too long to time-out
    
    After leader election happens, the followers will connect to the leader which is facilitated by the Learner.connectToLeader() method.
    
    Learner.connectToLeader() is relying on the initLimit configuration value to time-out in case the network connection is unreliable. This config may have a high value that could leave the ensemble retrying and waiting in the state of not having quorum for too long. The follower will retry up to 5 times.
    
    This patch introduces a new configuration directive that will allow Zookeeper to use different time-out value `connectToLeaderLimit` which then could be set to lower value than `initLimit`.
    
    Test plan:
    - ant clean
    - ant test-core-java
    
    NOTE: Lots of whitespace changes, hope it helps.
    
    Author: Vladimir Ivic <vladimir.ivic@me.com>
    
    Reviewers: eolivelli@apache.org, hanm@apache.org, andor@apache.org
    
    Closes #953 from vladimirivic/ZOOKEEPER-3398 and squashes the following commits:
    
    da4ecd055 [Vladimir Ivic] Removed redundant test, chaning LearnerTest.connectToLearnerMasterLimitTest() params and assertions
    6c413311c [Vladimir Ivic] Updating the tests with the new timeout parameter
    5a89cbd7e [Vladimir Ivic] Rewriting timeout logic inside Leader.connectToLeader
    99c065616 [Vladimir Ivic] Adding config connectToLearnerMasterLimit to prevent long connect timeout
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  6 ++++
 .../apache/zookeeper/test/system/BaseSysTest.java  |  3 +-
 .../zookeeper/test/system/QuorumPeerInstance.java  |  3 +-
 .../apache/zookeeper/server/quorum/Learner.java    | 38 +++++++++++++---------
 .../apache/zookeeper/server/quorum/QuorumPeer.java | 36 +++++++++++++++-----
 .../zookeeper/server/quorum/QuorumPeerConfig.java  |  4 +++
 .../zookeeper/server/quorum/QuorumPeerMain.java    |  1 +
 .../zookeeper/server/quorum/CnxManagerTest.java    | 18 +++++-----
 .../quorum/FLEBackwardElectionRoundTest.java       |  6 ++--
 .../server/quorum/FLELostMessageTest.java          |  4 +--
 .../server/quorum/FLEOutOfElectionTest.java        |  2 +-
 .../zookeeper/server/quorum/LearnerTest.java       | 22 +++++++++++++
 .../zookeeper/server/quorum/QuorumPeerTest.java    |  5 +--
 .../server/quorum/QuorumPeerTestBase.java          |  2 ++
 .../quorum/ReconfigDuringLeaderSyncTest.java       |  7 ++--
 .../org/apache/zookeeper/test/FLENewEpochTest.java |  4 +--
 .../apache/zookeeper/test/FLEPredicateTest.java    |  2 +-
 .../org/apache/zookeeper/test/FLERestartTest.java  |  4 +--
 .../java/org/apache/zookeeper/test/FLETest.java    | 10 +++---
 .../apache/zookeeper/test/FLEZeroWeightTest.java   |  2 +-
 .../zookeeper/test/HierarchicalQuorumTest.java     | 11 ++++---
 .../java/org/apache/zookeeper/test/QuorumBase.java | 22 +++++++------
 .../java/org/apache/zookeeper/test/QuorumUtil.java |  9 +++--
 .../org/apache/zookeeper/test/TruncateTest.java    |  7 ++--
 24 files changed, 151 insertions(+), 77 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index b0b07dc..bfff07b 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -913,6 +913,12 @@ of servers -- that is, when deploying clusters of servers.
     connect and sync to a leader. Increased this value as needed, if
     the amount of data managed by ZooKeeper is large.
 
+* *connectToLearnerMasterLimit* :
+    (Java system property: zookeeper.**connectToLearnerMasterLimit**)
+    Amount of time, in ticks (see [tickTime](#id_tickTime)), to allow followers to
+    connect to the leader after leader election. Defaults to the value of initLimit. 
+    Use when initLimit is high so connecting to learner master doesn't result in higher timeout.
+        
 * *leaderServes* :
     (Java system property: zookeeper.**leaderServes**)
     Leader accepts client connections. Default value is "yes".
diff --git a/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java b/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java
index 73669b0..8856282 100644
--- a/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java
+++ b/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java
@@ -176,11 +176,12 @@ public class BaseSysTest {
     final static int tickTime = 2000;
     final static int initLimit = 3;
     final static int syncLimit = 3;
+    final static int connectToLearnerMasterLimit = 3;
 
     public void startServer(int index) throws IOException {
         int port = fakeBasePort+10+index;
         if (fakeMachines) {
-            qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 3, index+1, tickTime, initLimit, syncLimit);
+            qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 3, index+1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             qps[index].start();
         } else {
             try {
diff --git a/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java b/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java
index cd66b43..782deaf 100644
--- a/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java
+++ b/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java
@@ -43,6 +43,7 @@ class QuorumPeerInstance implements Instance {
 
     private static final int syncLimit = 3;
     private static final int initLimit = 3;
+    private static final int connectToLearnerMasterLimit = 3;
     private static final int tickTime = 2000;
     String serverHostPort;
     int serverId;
@@ -191,7 +192,7 @@ class QuorumPeerInstance implements Instance {
                     return;
                 }
                 System.err.println("SnapDir = " + snapDir + " LogDir = " + logDir);
-                peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 3, serverId, tickTime, initLimit, syncLimit);
+                peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 3, serverId, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
                 peer.start();
                 for(int i = 0; i < 5; i++) {
                     Thread.sleep(500);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 51979aa..168c44b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -241,7 +241,7 @@ public class Learner {
     throws IOException {
         sock.connect(addr, timeout);
     }
-
+    
     /**
      * Establish a connection with the LearnerMaster found by findLearnerMaster.
      * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
@@ -256,41 +256,49 @@ public class Learner {
             throws IOException, InterruptedException, X509Exception {
         this.sock = createSocket();
 
-        int initLimitTime = self.tickTime * self.initLimit;
-        int remainingInitLimitTime;
+        // leader connection timeout defaults to tickTime * initLimit
+        int connectTimeout = self.tickTime * self.initLimit;
+
+        // but if connectToLearnerMasterLimit is specified, use that value to calculate
+        // timeout instead of using the initLimit value
+        if (self.connectToLearnerMasterLimit > 0) {
+          connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
+        }
+
+        int remainingTimeout;
         long startNanoTime = nanoTime();
 
         for (int tries = 0; tries < 5; tries++) {
             try {
                 // recalculate the init limit time because retries sleep for 1000 milliseconds
-                remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
-                if (remainingInitLimitTime <= 0) {
-                    LOG.error("initLimit exceeded on retries.");
-                    throw new IOException("initLimit exceeded on retries.");
+                remainingTimeout = connectTimeout - (int)((nanoTime() - startNanoTime) / 1000000);
+                if (remainingTimeout <= 0) {
+                    LOG.error("connectToLeader exceeded on retries.");
+                    throw new IOException("connectToLeader exceeded on retries.");
                 }
-
-                sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
+                
+                sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout));
                 if (self.isSslQuorum())  {
                     ((SSLSocket) sock).startHandshake();
                 }
                 sock.setTcpNoDelay(nodelay);
                 break;
             } catch (IOException e) {
-                remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
+                remainingTimeout = connectTimeout - (int)((nanoTime() - startNanoTime) / 1000000);
 
-                if (remainingInitLimitTime <= 1000) {
-                    LOG.error("Unexpected exception, initLimit exceeded. tries=" + tries +
-                             ", remaining init limit=" + remainingInitLimitTime +
+                if (remainingTimeout <= 1000) {
+                    LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries +
+                             ", remaining init limit=" + remainingTimeout +
                              ", connecting to " + addr,e);
                     throw e;
                 } else if (tries >= 4) {
                     LOG.error("Unexpected exception, retries exceeded. tries=" + tries +
-                             ", remaining init limit=" + remainingInitLimitTime +
+                             ", remaining init limit=" + remainingTimeout +
                              ", connecting to " + addr,e);
                     throw e;
                 } else {
                     LOG.warn("Unexpected exception, tries=" + tries +
-                            ", remaining init limit=" + remainingInitLimitTime +
+                            ", remaining init limit=" + remainingTimeout +
                             ", connecting to " + addr,e);
                     this.sock = createSocket();
                 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index f3217af..062f259 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -587,6 +587,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     protected volatile int syncLimit;
     
     /**
+     * The number of ticks that can pass before retrying to connect to learner master
+     */
+    protected volatile int connectToLearnerMasterLimit;
+    
+    /**
      * Enables/Disables sync request processor. This option is enabled
      * by default and is to be used with observers.
      */
@@ -899,16 +904,16 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
     public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
             File dataLogDir, int electionType,
-            long myid, int tickTime, int initLimit, int syncLimit,
+            long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
             ServerCnxnFactory cnxnFactory) throws IOException {
         this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
-                initLimit, syncLimit, false, cnxnFactory,
+                initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory,
                 new QuorumMaj(quorumPeers));
     }
 
     public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
             File dataLogDir, int electionType,
-            long myid, int tickTime, int initLimit, int syncLimit,
+            long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
             boolean quorumListenOnAllIPs,
             ServerCnxnFactory cnxnFactory,
             QuorumVerifier quorumConfig) throws IOException {
@@ -919,6 +924,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         this.tickTime = tickTime;
         this.initLimit = initLimit;
         this.syncLimit = syncLimit;
+        this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
         this.quorumListenOnAllIPs = quorumListenOnAllIPs;
         this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
         this.zkDb = new ZKDatabase(this.logFactory);
@@ -1049,7 +1055,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
       }
       return count;
     }
-
+    
     /**
 
      * This constructor is only used by the existing unit test code.
@@ -1057,10 +1063,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
     public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
             File logDir, int clientPort, int electionAlg,
-            long myid, int tickTime, int initLimit, int syncLimit)
+            long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit)
         throws IOException
     {
-        this(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
+        this(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false,
                 ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
                 new QuorumMaj(quorumPeers));
     }
@@ -1071,12 +1077,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
     public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
             File logDir, int clientPort, int electionAlg,
-            long myid, int tickTime, int initLimit, int syncLimit,
+            long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
             QuorumVerifier quorumConfig)
         throws IOException
     {
         this(quorumPeers, snapDir, logDir, electionAlg,
-                myid,tickTime, initLimit,syncLimit, false,
+                myid,tickTime, initLimit,syncLimit, connectToLearnerMasterLimit, false,
                 ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
                 quorumConfig);
     }
@@ -1785,6 +1791,20 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         this.syncLimit = syncLimit;
     }
     
+    /**
+     * Get the connectToLearnerMasterLimit
+     */
+    public int getConnectToLearnerMasterLimit() {
+        return connectToLearnerMasterLimit;
+    }
+    
+    /**
+     * Set the connectToLearnerMasterLimit
+     */
+    public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) {
+        LOG.info("connectToLearnerMasterLimit set to " + connectToLearnerMasterLimit);
+        this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
+    }
     
     /**
      * The syncEnabled can also be set via a system property.
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index b0d2800..b1bce11 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -94,6 +94,7 @@ public class QuorumPeerConfig {
 
     protected int initLimit;
     protected int syncLimit;
+    protected int connectToLearnerMasterLimit;
     protected int electionAlg = 3;
     protected int electionPort = 2182;
     protected boolean quorumListenOnAllIPs = false;
@@ -306,6 +307,8 @@ public class QuorumPeerConfig {
                 initLimit = Integer.parseInt(value);
             } else if (key.equals("syncLimit")) {
                 syncLimit = Integer.parseInt(value);
+            } else if (key.equals("connectToLearnerMasterLimit")) {
+                connectToLearnerMasterLimit = Integer.parseInt(value);
             } else if (key.equals("electionAlg")) {
                 electionAlg = Integer.parseInt(value);
                 if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) {
@@ -834,6 +837,7 @@ public class QuorumPeerConfig {
 
     public int getInitLimit() { return initLimit; }
     public int getSyncLimit() { return syncLimit; }
+    public int getConnectToLearnerMasterLimit() { return connectToLearnerMasterLimit; }
     public int getElectionAlg() { return electionAlg; }
     public int getElectionPort() { return electionPort; }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
index 79293ea..3be0449 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
@@ -189,6 +189,7 @@ public class QuorumPeerMain {
           quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
           quorumPeer.setInitLimit(config.getInitLimit());
           quorumPeer.setSyncLimit(config.getSyncLimit());
+          quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
           quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
           quorumPeer.setConfigFileName(config.getConfigFilename());
           quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
index d3a631b..878e41b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
@@ -118,7 +118,7 @@ public class CnxManagerTest extends ZKTestCase {
 
         public void run(){
             try {
-                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2);
+                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2, 2);
                 QuorumCnxManager cnxManager = peer.createCnxnManager();
                 QuorumCnxManager.Listener listener = cnxManager.listener;
                 if(listener != null){
@@ -162,7 +162,7 @@ public class CnxManagerTest extends ZKTestCase {
 
         thread.start();
 
-        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -209,7 +209,7 @@ public class CnxManagerTest extends ZKTestCase {
                         new InetSocketAddress(deadAddress, PortAssignment.unique())));
         peerTmpdir[2] = ClientBase.createTmpDir();
 
-        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -237,7 +237,7 @@ public class CnxManagerTest extends ZKTestCase {
      */
     @Test
     public void testCnxManagerSpinLock() throws Exception {
-        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -302,7 +302,7 @@ public class CnxManagerTest extends ZKTestCase {
         // the connecting peer (id = 2) is a 3.4.6 observer
         peers.get(2L).type = LearnerType.OBSERVER;
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
-                peerClientPort[1], 3, 1, 1000, 2, 2);
+                peerClientPort[1], 3, 1, 1000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if (listener != null) {
@@ -349,7 +349,7 @@ public class CnxManagerTest extends ZKTestCase {
      */
     @Test
     public void testSocketTimeout() throws Exception {
-        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2, 2);
         QuorumCnxManager cnxManager = peer.createCnxnManager();
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -434,7 +434,7 @@ public class CnxManagerTest extends ZKTestCase {
         };
 
         QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0],
-                peerClientPort[0], 3, 0, 2000, 2, 2) {
+                peerClientPort[0], 3, 0, 2000, 2, 2, 2) {
             @Override
             public QuorumX509Util createX509Util() {
                 return mockedX509Util;
@@ -457,7 +457,7 @@ public class CnxManagerTest extends ZKTestCase {
             for (int sid = 0; sid < 3; sid++) {
                 QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid],
                         peerTmpdir[sid], peerClientPort[sid], 3, sid, 1000, 2,
-                        2);
+                        2, 2);
                 LOG.info("Starting peer {}", peer.getId());
                 peer.start();
                 peerList.add(sid, peer);
@@ -477,7 +477,7 @@ public class CnxManagerTest extends ZKTestCase {
                     // Restart halted node and verify count
                     peer = new QuorumPeer(peers, peerTmpdir[myid],
                             peerTmpdir[myid], peerClientPort[myid], 3, myid,
-                            1000, 2, 2);
+                            1000, 2, 2, 2);
                     LOG.info("Round {}, restarting peer ",
                             new Object[] { i, peer.getId() });
                     peer.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
index 2ccbc77..09adbc4 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
@@ -106,7 +106,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
         /*
          * Start server 0
          */
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
         peer.startLeaderElection();
         FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
         thread.start();
@@ -114,7 +114,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
         /*
          * Start mock server 1
          */
-        QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+        QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2, 2);
         cnxManagers[0] = mockPeer.createCnxnManager();
         cnxManagers[0].listener.start();
 
@@ -123,7 +123,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
         /*
          * Start mock server 2
          */
-        mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
+        mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2, 2);
         cnxManagers[1] = mockPeer.createCnxnManager();
         cnxManagers[1].listener.start();
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
index 6583f90..ad2b180 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
@@ -78,7 +78,7 @@ public class FLELostMessageTest extends ZKTestCase {
         /*
          * Start server 0
          */
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2, 2);
         peer.startLeaderElection();
         FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
         thread.start();
@@ -94,7 +94,7 @@ public class FLELostMessageTest extends ZKTestCase {
     }
 
     void mockServer() throws InterruptedException, IOException {
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
         cnxManager = peer.createCnxnManager();
         cnxManager.listener.start();
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java
index 455d04f..d856b79 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java
@@ -52,7 +52,7 @@ public class FLEOutOfElectionTest {
                     new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
         }
         QuorumPeer peer = new QuorumPeer(peers, tmpdir, tmpdir, 
-                PortAssignment.unique(), 3, 3, 1000, 2, 2);
+                PortAssignment.unique(), 3, 3, 1000, 2, 2, 2);
         fle = new FastLeaderElection(peer, peer.createCnxnManager());
     }
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java
index 85295f8..7eccf60 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java
@@ -140,6 +140,28 @@ public class LearnerTest extends ZKTestCase {
     }
 
     @Test
+    public void connectToLearnerMasterLimitTest() throws Exception {
+      TimeoutLearner learner = new TimeoutLearner();
+      learner.self = new QuorumPeer();
+      learner.self.setTickTime(2000);
+      learner.self.setInitLimit(2);
+      learner.self.setSyncLimit(2);
+      learner.self.setConnectToLearnerMasterLimit(5);
+      
+      InetSocketAddress addr = new InetSocketAddress(1111);
+      learner.setTimeMultiplier((long)4000 * 1000000);
+      learner.setPassConnectAttempt(5);
+      
+      try {
+          learner.connectToLeader(addr, "");
+          Assert.fail("should have thrown IOException!");
+      } catch (IOException e) {
+        Assert.assertTrue(learner.nanoTime() > 2000*5*1000000);
+        Assert.assertEquals(3, learner.getSockConnectAttempt());
+      }
+    }
+
+    @Test
     public void syncTest() throws Exception {
         File tmpFile = File.createTempFile("test", ".dir", testData);
         tmpFile.delete();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java
index 43ed24b..d6b3774 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java
@@ -39,6 +39,7 @@ public class QuorumPeerTest {
     private int tickTime = 2000;
     private int initLimit = 3;
     private int syncLimit = 3;
+    private int connectToLearnerMasterLimit = 3;
 
     /**
      * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2301
@@ -60,7 +61,7 @@ public class QuorumPeerTest {
          * QuorumPeer constructor without QuorumVerifier
          */
         QuorumPeer peer1 = new QuorumPeer(peersView, dataDir, dataDir, clientPort, electionAlg, myId, tickTime,
-                initLimit, syncLimit);
+                initLimit, syncLimit, connectToLearnerMasterLimit);
         String hostString1 = peer1.cnxnFactory.getLocalAddress().getHostString();
         assertEquals(clientIP.getHostAddress(), hostString1);
 
@@ -77,7 +78,7 @@ public class QuorumPeerTest {
                         new InetSocketAddress(clientIP, PortAssignment.unique()),
                         new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT));
         QuorumPeer peer2 = new QuorumPeer(peersView, dataDir, dataDir, clientPort, electionAlg, myId, tickTime,
-                initLimit, syncLimit);
+                initLimit, syncLimit, connectToLearnerMasterLimit);
         String hostString2 = peer2.cnxnFactory.getLocalAddress().getHostString();
         assertEquals(clientIP.getHostAddress(), hostString2);
         // cleanup
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index 1a1c796..456a3ef 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -126,6 +126,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             fwriter.write("tickTime=" + tickTime + "\n");
             fwriter.write("initLimit=10\n");
             fwriter.write("syncLimit=5\n");
+            fwriter.write("connectToLearnerMasterLimit=5\n");
 
             tmpDir = new File(baseDir, "data");
             if (!tmpDir.mkdir()) {
@@ -229,6 +230,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             fwriter.write("tickTime=4000\n");
             fwriter.write("initLimit=10\n");
             fwriter.write("syncLimit=5\n");
+            fwriter.write("connectToLearnerMasterLimit=5\n");
             if(configs != null){
                 fwriter.write(configs);
             }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
index f350abf..4d07ffa 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
@@ -208,9 +208,9 @@ public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
         private boolean newLeaderMessage = false;
 
         public CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort,
-                int electionAlg, long myid, int tickTime, int initLimit, int syncLimit)
+                int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit)
                 throws IOException {
-            super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
+            super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false,
                     ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1), new QuorumMaj(quorumPeers));
         }
 
@@ -256,7 +256,8 @@ public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
                 throws IOException, AdminServerException {
             quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(), config.getDataDir(),
                     config.getDataLogDir(), config.getClientPortAddress().getPort(), config.getElectionAlg(),
-                    config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit());
+                    config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit(),
+                    config.getConnectToLearnerMasterLimit());
             quorumPeer.setConfigFileName(config.getConfigFilename());
             quorumPeer.start();
             try {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
index 8bf365f..2766c29 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
@@ -165,7 +165,7 @@ public class FLENewEpochTest extends ZKTestCase {
           }
 
           for(int i = 1; i < count; i++) {
-              QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+              QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
               peer.startLeaderElection();
               LEThread thread = new LEThread(peer, i);
               thread.start();
@@ -174,7 +174,7 @@ public class FLENewEpochTest extends ZKTestCase {
           if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS))
               Assert.fail("First leader election failed");
 
-          QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+          QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
           peer.startLeaderElection();
           LEThread thread = new LEThread(peer, 0);
           thread.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
index bc43775..4e252de 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
@@ -75,7 +75,7 @@ public class FLEPredicateTest extends ZKTestCase {
         try{
             File tmpDir = ClientBase.createTmpDir();
             QuorumPeer peer = new QuorumPeer(peers, tmpDir, tmpDir,
-                                        PortAssignment.unique(), 3, 0, 1000, 2, 2);
+                                        PortAssignment.unique(), 3, 0, 1000, 2, 2, 2);
         
             MockFLE mock = new MockFLE(peer);
             mock.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
index b77b93b..f930391 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
@@ -123,7 +123,7 @@ public class FLERestartTest extends ZKTestCase {
                             QuorumBase.shutdown(peer);
                             ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
 
-                            peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+                            peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
                             peer.startLeaderElection();
                             peerRound++;
                         } else {
@@ -171,7 +171,7 @@ public class FLERestartTest extends ZKTestCase {
         }
 
         for(int i = 0; i < count; i++) {
-            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
             peer.startLeaderElection();
             FLERestartThread thread = new FLERestartThread(peer, i);
             thread.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
index 859c4a4..f6dc513 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
@@ -324,7 +324,7 @@ public class FLETest extends ZKTestCase {
          */
         for(int i = 0; i < count; i++) {
             QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i],
-                    port[i], 3, i, 1000, 2, 2);
+                    port[i], 3, i, 1000, 2, 2, 2);
             peer.startLeaderElection();
             LEThread thread = new LEThread(this, peer, i, rounds, quora);
             thread.start();
@@ -429,7 +429,7 @@ public class FLETest extends ZKTestCase {
         // start 2 peers and verify if they form the cluster
         for (sid = 0; sid < 2; sid++) {
             peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
-                                             port[sid], 3, sid, 2000, 2, 2);
+                                             port[sid], 3, sid, 2000, 2, 2, 2);
             LOG.info("Starting peer " + peer.getId());
             peer.start();
             peerList.add(sid, peer);
@@ -443,7 +443,7 @@ public class FLETest extends ZKTestCase {
             !v1.isSuccess());
         // Start 3rd peer and check if it goes in LEADING state
         peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
-                 port[sid], 3, sid, 2000, 2, 2);
+                 port[sid], 3, sid, 2000, 2, 2, 2);
         LOG.info("Starting peer " + peer.getId());
         peer.start();
         peerList.add(sid, peer);
@@ -488,7 +488,7 @@ public class FLETest extends ZKTestCase {
         // start 2 peers and verify if they form the cluster
         for (sid = 0; sid < 2; sid++) {
             peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
-                                             port[sid], 3, sid, 2000, 2, 2);
+                                             port[sid], 3, sid, 2000, 2, 2, 2);
             LOG.info("Starting peer " + peer.getId());
             peer.start();
             peerList.add(sid, peer);
@@ -510,7 +510,7 @@ public class FLETest extends ZKTestCase {
         peer.setCurrentVote(newVote);
         // Start 3rd peer and check if it joins the quorum
         peer = new QuorumPeer(peers, tmpdir[2], tmpdir[2],
-                 port[2], 3, 2, 2000, 2, 2);
+                 port[2], 3, 2, 2000, 2, 2, 2);
         LOG.info("Starting peer " + peer.getId());
         peer.start();
         peerList.add(sid, peer);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
index 351b008..65d436d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
@@ -154,7 +154,7 @@ public class FLEZeroWeightTest extends ZKTestCase {
 
         for(int i = 0; i < count; i++) {
             QuorumHierarchical hq = new QuorumHierarchical(qp);
-            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, hq);
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2, hq);
             peer.startLeaderElection();
             LEThread thread = new LEThread(peer, i);
             thread.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java
index a1dc6c5..bc746e5 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java
@@ -145,6 +145,7 @@ public class HierarchicalQuorumTest extends ClientBase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+        int connectToLearnerMasterLimit = 3;
         HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
         peers.put(Long.valueOf(1), new QuorumServer(1, 
                 new InetSocketAddress("127.0.0.1", port1),
@@ -178,22 +179,22 @@ public class HierarchicalQuorumTest extends ClientBase {
                qp.setProperty("server.5", "127.0.0.1:" + port5 + ":" + leport5 +  ":observer" + ";" + clientport5);
         }
         QuorumHierarchical hq1 = new QuorumHierarchical(qp); 
-        s1 = new QuorumPeer(peers, s1dir, s1dir, clientport1, 3, 1, tickTime, initLimit, syncLimit, hq1);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, clientport1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq1);
         Assert.assertEquals(clientport1, s1.getClientPort());
         
         LOG.info("creating QuorumPeer 2 port " + clientport2);
         QuorumHierarchical hq2 = new QuorumHierarchical(qp); 
-        s2 = new QuorumPeer(peers, s2dir, s2dir, clientport2, 3, 2, tickTime, initLimit, syncLimit, hq2);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, clientport2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq2);
         Assert.assertEquals(clientport2, s2.getClientPort());
         
         LOG.info("creating QuorumPeer 3 port " + clientport3);
         QuorumHierarchical hq3 = new QuorumHierarchical(qp); 
-        s3 = new QuorumPeer(peers, s3dir, s3dir, clientport3, 3, 3, tickTime, initLimit, syncLimit, hq3);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, clientport3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq3);
         Assert.assertEquals(clientport3, s3.getClientPort());
         
         LOG.info("creating QuorumPeer 4 port " + clientport4);
         QuorumHierarchical hq4 = new QuorumHierarchical(qp); 
-        s4 = new QuorumPeer(peers, s4dir, s4dir, clientport4, 3, 4, tickTime, initLimit, syncLimit, hq4);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, clientport4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq4);
         if (withObservers) {
             s4.setLearnerType(QuorumPeer.LearnerType.OBSERVER);
         }
@@ -201,7 +202,7 @@ public class HierarchicalQuorumTest extends ClientBase {
                        
         LOG.info("creating QuorumPeer 5 port " + clientport5);
         QuorumHierarchical hq5 = new QuorumHierarchical(qp); 
-        s5 = new QuorumPeer(peers, s5dir, s5dir, clientport5, 3, 5, tickTime, initLimit, syncLimit, hq5);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, clientport5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq5);
         if (withObservers) {
             s5.setLearnerType(QuorumPeer.LearnerType.OBSERVER);
         }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
index fcaa9b6..a2b0615 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
@@ -137,6 +137,7 @@ public class QuorumBase extends ClientBase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+        int connectToLearnerMasterLimit = 3;
         Map<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
         peers.put(Long.valueOf(1), new QuorumServer(1,
                 new InetSocketAddress(LOCALADDR, port1),
@@ -170,19 +171,19 @@ public class QuorumBase extends ClientBase {
         }
 
         LOG.info("creating QuorumPeer 1 port " + portClient1);
-        s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient1, s1.getClientPort());
         LOG.info("creating QuorumPeer 2 port " + portClient2);
-        s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient2, s2.getClientPort());
         LOG.info("creating QuorumPeer 3 port " + portClient3);
-        s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient3, s3.getClientPort());
         LOG.info("creating QuorumPeer 4 port " + portClient4);
-        s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient4, s4.getClientPort());
         LOG.info("creating QuorumPeer 5 port " + portClient5);
-        s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         Assert.assertEquals(portClient5, s5.getClientPort());
 
         if (withObservers) {
@@ -299,6 +300,7 @@ public class QuorumBase extends ClientBase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+        int connectToLearnerMasterLimit = 3;
 
         if(peers == null){
             peers = new HashMap<Long,QuorumServer>();
@@ -333,27 +335,27 @@ public class QuorumBase extends ClientBase {
         switch(i){
         case 1:
             LOG.info("creating QuorumPeer 1 port " + portClient1);
-            s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit);
+            s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient1, s1.getClientPort());
             break;
         case 2:
             LOG.info("creating QuorumPeer 2 port " + portClient2);
-            s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit);
+            s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient2, s2.getClientPort());
             break;
         case 3:
             LOG.info("creating QuorumPeer 3 port " + portClient3);
-            s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit);
+            s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient3, s3.getClientPort());
             break;
         case 4:
             LOG.info("creating QuorumPeer 4 port " + portClient4);
-            s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit);
+            s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient4, s4.getClientPort());
             break;
         case 5:
             LOG.info("creating QuorumPeer 5 port " + portClient5);
-            s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit);
+            s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
             Assert.assertEquals(portClient5, s5.getClientPort());
         }
     }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
index 314171d..6d711fc 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
@@ -73,6 +73,8 @@ public class QuorumUtil {
     private int initLimit;
 
     private int syncLimit;
+    
+    private int connectToLearnerMasterLimit;
 
     private int electionAlg;
 
@@ -94,6 +96,7 @@ public class QuorumUtil {
             tickTime = 2000;
             initLimit = 3;
             this.syncLimit = syncLimit;
+            connectToLearnerMasterLimit = 3;
             electionAlg = 3;
             hostPort = "";
 
@@ -115,7 +118,7 @@ public class QuorumUtil {
                 PeerStruct ps = peers.get(i);
                 LOG.info("Creating QuorumPeer " + i + "; public port " + ps.clientPort);
                 ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort,
-                        electionAlg, ps.id, tickTime, initLimit, syncLimit);
+                        electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
                 Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
             }
         } catch (Exception e) {
@@ -202,7 +205,7 @@ public class QuorumUtil {
         PeerStruct ps = getPeer(id);
         LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
         ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
-                ps.id, tickTime, initLimit, syncLimit);
+                ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         if (localSessionEnabled) {
             ps.peer.enableLocalSessions(true);
         }
@@ -221,7 +224,7 @@ public class QuorumUtil {
         PeerStruct ps = getPeer(id);
         LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
         ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
-                ps.id, tickTime, initLimit, syncLimit);
+                ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         if (localSessionEnabled) {
             ps.peer.enableLocalSessions(true);
         }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
index d7d5ffa..2290a34 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
@@ -185,6 +185,7 @@ public class TruncateTest extends ZKTestCase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
+				int connectToLearnerMasterLimit = 3;
 
         int port1 = PortAssignment.unique();
         int port2 = PortAssignment.unique();
@@ -205,9 +206,9 @@ public class TruncateTest extends ZKTestCase {
                        new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
                        new InetSocketAddress("127.0.0.1", port3)));
 
-        QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 3, 2, tickTime, initLimit, syncLimit);
+        QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         s2.start();
-        QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 3, 3, tickTime, initLimit, syncLimit);
+        QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         s3.start();
         zk = ClientBase.createZKClient("127.0.0.1:" + port2, 15000);
 
@@ -223,7 +224,7 @@ public class TruncateTest extends ZKTestCase {
         } catch(KeeperException.NoNodeException e) {
             // this is what we want
         }
-        QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 3, 1, tickTime, initLimit, syncLimit);
+        QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
         s1.start();
         ZooKeeper zk1 = ClientBase.createZKClient("127.0.0.1:" + port1, 15000);
         zk1.getData("/9", false, new Stat());


Mime
View raw message