zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shra...@apache.org
Subject zookeeper git commit: ZOOKEEPER-2959: ignore accepted epoch and LEADERINFO ack from observers
Date Thu, 10 May 2018 04:00:59 GMT
Repository: zookeeper
Updated Branches:
  refs/heads/master 43f117ef5 -> 088dfdf18


ZOOKEEPER-2959: ignore accepted epoch and LEADERINFO ack from observers


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/088dfdf1
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/088dfdf1
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/088dfdf1

Branch: refs/heads/master
Commit: 088dfdf188663f6bad79b0e87b710737b318537d
Parents: 43f117e
Author: Alexander Shraer <ashraer@apple.com>
Authored: Wed May 9 21:00:15 2018 -0700
Committer: Alexander Shraer <ashraer@apple.com>
Committed: Wed May 9 21:00:15 2018 -0700

----------------------------------------------------------------------
 .../apache/zookeeper/server/quorum/Leader.java  |  30 ++-
 .../zookeeper/server/quorum/LearnerHandler.java |   2 +-
 .../server/quorum/LeaderWithObserverTest.java   | 185 +++++++++++++++++++
 .../zookeeper/server/quorum/Zab1_0Test.java     | 142 +-------------
 .../zookeeper/server/quorum/ZabUtils.java       | 166 +++++++++++++++++
 5 files changed, 379 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/088dfdf1/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
index 52a9113..4774a42 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
@@ -97,7 +97,8 @@ public class Leader {
 
     final QuorumPeer self;
 
-    private boolean quorumFormed = false;
+    // VisibleForTesting
+    protected boolean quorumFormed = false;
 
     // the follower acceptor thread
     volatile LearnerCnxAcceptor cnxAcceptor = null;
@@ -358,7 +359,8 @@ public class Leader {
 
     private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
 
-    private final Proposal newLeaderProposal = new Proposal();
+    // VisibleForTesting
+    protected final Proposal newLeaderProposal = new Proposal();
 
     class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
         private volatile boolean stop = false;
@@ -507,7 +509,7 @@ public class Leader {
              self.setCurrentEpoch(epoch);    
             
              try {
-                 waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
+                 waitForNewLeaderAck(self.getId(), zk.getZxid());
              } catch (InterruptedException e) {
                  shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                          + newLeaderProposal.ackSetsToString() + " ]");
@@ -1163,7 +1165,8 @@ public class Leader {
 
         return lastProposed;
     }
-    private final HashSet<Long> connectingFollowers = new HashSet<Long>();
+    // VisibleForTesting
+    protected final Set<Long> connectingFollowers = new HashSet<Long>();
     public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException,
IOException {
         synchronized(connectingFollowers) {
             if (!waitingForNewEpoch) {
@@ -1172,7 +1175,9 @@ public class Leader {
             if (lastAcceptedEpoch >= epoch) {
                 epoch = lastAcceptedEpoch+1;
             }
-            connectingFollowers.add(sid);
+            if (isParticipant(sid)) {
+                connectingFollowers.add(sid);
+            }
             QuorumVerifier verifier = self.getQuorumVerifier();
             if (connectingFollowers.contains(self.getId()) &&
                                             verifier.containsQuorum(connectingFollowers))
{
@@ -1195,8 +1200,10 @@ public class Leader {
         }
     }
 
-    private final HashSet<Long> electingFollowers = new HashSet<Long>();
-    private boolean electionFinished = false;
+    // VisibleForTesting
+    protected final Set<Long> electingFollowers = new HashSet<Long>();
+    // VisibleForTesting
+    protected boolean electionFinished = false;
     public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException
{
         synchronized(electingFollowers) {
             if (electionFinished) {
@@ -1210,7 +1217,7 @@ public class Leader {
                                                     + leaderStateSummary.getLastZxid()
                                                     + " (last zxid)");
                 }
-                if (ss.getLastZxid() != -1) {
+                if (ss.getLastZxid() != -1 && isParticipant(id)) {
                     electingFollowers.add(id);
                 }
             }
@@ -1294,10 +1301,9 @@ public class Leader {
      * sufficient acks.
      *
      * @param sid
-     * @param learnerType
      * @throws InterruptedException
      */
-    public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
+    public void waitForNewLeaderAck(long sid, long zxid)
             throws InterruptedException {
 
         synchronized (newLeaderProposal.qvAcksetPairs) {
@@ -1393,4 +1399,8 @@ public class Leader {
     private boolean isRunning() {
         return self.isRunning() && zk.isRunning();
     }
+
+    private boolean isParticipant(long sid) {
+        return self.getQuorumVerifier().getVotingMembers().containsKey(sid);
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/088dfdf1/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 7247f5c..f6c68b0 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -507,7 +507,7 @@ public class LearnerHandler extends ZooKeeperThread {
             if(LOG.isDebugEnabled()){
             	LOG.debug("Received NEWLEADER-ACK message from " + sid);   
             }
-            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
+            leader.waitForNewLeaderAck(getSid(), qp.getZxid());
 
             syncLimitCheck.start();
             

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/088dfdf1/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
b/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
new file mode 100644
index 0000000..2548aca
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
+import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
+
+public class LeaderWithObserverTest {
+
+    QuorumPeer peer;
+    Leader leader;
+    File tmpDir;
+    long participantId;
+    long observerId;
+
+    @Before
+    public void setUp() throws Exception {
+        tmpDir = ClientBase.createTmpDir();
+        peer = createQuorumPeer(tmpDir);
+        participantId = 1;
+        Map<Long, QuorumPeer.QuorumServer> peers = peer.getQuorumVerifier().getAllMembers();
+        observerId = peers.size();
+        leader = createLeader(tmpDir, peer);
+        peer.leader = leader;
+        peers.put(observerId, new QuorumPeer.QuorumServer(
+                observerId, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                QuorumPeer.LearnerType.OBSERVER));
+
+        // these tests are serial, we can speed up InterruptedException
+        peer.tickTime = 1;
+    }
+
+    @After
+    public void tearDown(){
+        leader.shutdown("end of test");
+        tmpDir.delete();
+    }
+
+    @Test
+    public void testGetEpochToPropose() throws Exception {
+        long lastAcceptedEpoch = 5;
+        peer.setAcceptedEpoch(5);
+
+        Assert.assertEquals("Unexpected vote in connectingFollowers", 0, leader.connectingFollowers.size());
+        Assert.assertTrue(leader.waitingForNewEpoch);
+        try {
+            // Leader asks for epoch (mocking Leader.lead behavior)
+            // First add to connectingFollowers
+            leader.getEpochToPropose(peer.getId(), lastAcceptedEpoch);
+        } catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size());
+        Assert.assertEquals("Leader shouldn't set new epoch until quorum of participants
is in connectingFollowers",
+                lastAcceptedEpoch, peer.getAcceptedEpoch());
+        Assert.assertTrue(leader.waitingForNewEpoch);
+        try {
+            // Observer asks for epoch (mocking LearnerHandler behavior)
+            leader.getEpochToPropose(observerId, lastAcceptedEpoch);
+        } catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size());
+        Assert.assertEquals("Leader shouldn't set new epoch after observer asks for epoch",
+                lastAcceptedEpoch, peer.getAcceptedEpoch());
+        Assert.assertTrue(leader.waitingForNewEpoch);
+        try {
+            // Now participant asks for epoch (mocking LearnerHandler behavior). Second add
to connectingFollowers.
+            // Triggers verifier.containsQuorum = true
+            leader.getEpochToPropose(participantId, lastAcceptedEpoch);
+        } catch (Exception e) {
+            Assert.fail("Timed out in getEpochToPropose");
+        }
+
+        Assert.assertEquals("Unexpected vote in connectingFollowers", 2, leader.connectingFollowers.size());
+        Assert.assertEquals("Leader should record next epoch", lastAcceptedEpoch + 1, peer.getAcceptedEpoch());
+        Assert.assertFalse(leader.waitingForNewEpoch);
+    }
+
+    @Test
+    public void testWaitForEpochAck() throws Exception {
+        // things needed for waitForEpochAck to run (usually in leader.lead(), but we're
not running leader here)
+        leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());
+
+        Assert.assertEquals("Unexpected vote in electingFollowers", 0, leader.electingFollowers.size());
+        Assert.assertFalse(leader.electionFinished);
+        try {
+            // leader calls waitForEpochAck, first add to electingFollowers
+            leader.waitForEpochAck(peer.getId(), new StateSummary(0, 0));
+        }  catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size());
+        Assert.assertFalse(leader.electionFinished);
+        try {
+            // observer calls waitForEpochAck, should fail verifier.containsQuorum
+            leader.waitForEpochAck(observerId, new StateSummary(0, 0));
+        }  catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size());
+        Assert.assertFalse(leader.electionFinished);
+        try {
+            // second add to electingFollowers, verifier.containsQuorum=true, waitForEpochAck
returns without exceptions
+            leader.waitForEpochAck(participantId, new StateSummary(0, 0));
+            Assert.assertEquals("Unexpected vote in electingFollowers", 2, leader.electingFollowers.size());
+            Assert.assertTrue(leader.electionFinished);
+        } catch (Exception e) {
+            Assert.fail("Timed out in waitForEpochAck");
+        }
+    }
+
+    @Test
+    public void testWaitForNewLeaderAck() throws Exception {
+        long zxid = leader.zk.getZxid();
+
+        // things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're
not running leader here)
+        leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null, null);
+        leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());
+
+        Set<Long> ackSet = leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();
+        Assert.assertEquals("Unexpected vote in ackSet", 0, ackSet.size());
+        Assert.assertFalse(leader.quorumFormed);
+        try {
+            // leader calls waitForNewLeaderAck, first add to ackSet
+            leader.waitForNewLeaderAck(peer.getId(), zxid);
+        }  catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in ackSet", 1, ackSet.size());
+        Assert.assertFalse(leader.quorumFormed);
+        try {
+            // observer calls waitForNewLeaderAck, should fail verifier.containsQuorum
+            leader.waitForNewLeaderAck(observerId, zxid);
+        }  catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in ackSet", 1, ackSet.size());
+        Assert.assertFalse(leader.quorumFormed);
+        try {
+            // second add to ackSet, verifier.containsQuorum=true, waitForNewLeaderAck returns
without exceptions
+            leader.waitForNewLeaderAck(participantId, zxid);
+            Assert.assertEquals("Unexpected vote in ackSet", 2, ackSet.size());
+            Assert.assertTrue(leader.quorumFormed);
+        } catch (Exception e) {
+            Assert.fail("Timed out in waitForEpochAck");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/088dfdf1/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 10ec1ea..aabd9d8 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -18,6 +18,11 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
+import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader;
+import static org.apache.zookeeper.server.quorum.ZabUtils.MockLeader;
+import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
+
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -27,8 +32,6 @@ import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.EOFException;
@@ -37,15 +40,12 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
-import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -55,13 +55,10 @@ import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.ByteBufferOutputStream;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.TestUtils;
@@ -78,7 +75,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Zab1_0Test extends ZKTestCase {
-    private static final int SYNC_LIMIT = 2;
 
     private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class);
 
@@ -109,26 +105,6 @@ public class Zab1_0Test extends ZKTestCase {
             }
         }
     }
-
-    
-   private static final class MockLeader extends Leader {
-       
-       MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk)
-       throws IOException {
-           super(qp, zk);
-       }
-       
-       /**
-        * This method returns the value of the variable that holds the epoch
-        * to be proposed and that has been proposed, depending on the point
-        * of the execution in which it is called. 
-        * 
-        * @return epoch
-        */
-       public long getCurrentEpochToPropose() {
-           return epoch;
-       }
-   }
    
    public static final class FollowerMockThread extends Thread {
     	private final Leader leader;
@@ -287,54 +263,6 @@ public class Zab1_0Test extends ZKTestCase {
         }
     }
 
-    private static final class NullServerCnxnFactory extends ServerCnxnFactory {
-        public void startup(ZooKeeperServer zkServer, boolean startServer)
-                throws IOException, InterruptedException {
-        }
-        public void start() {
-        }
-        public void shutdown() {
-        }
-        public void setMaxClientCnxnsPerHost(int max) {
-        }
-        public void join() throws InterruptedException {
-        }
-        public int getMaxClientCnxnsPerHost() {
-            return 0;
-        }
-        public int getLocalPort() {
-            return 0;
-        }
-        public InetSocketAddress getLocalAddress() {
-            return null;
-        }
-        public Iterable<ServerCnxn> getConnections() {
-            return null;
-        }
-        public void configure(InetSocketAddress addr, int maxcc, boolean secure)
-                throws IOException {
-        }
-
-        public boolean closeSession(long sessionId) {
-            return false;
-        }
-        public void closeAll() {
-        }
-        @Override
-        public int getNumAliveConnections() {
-            return 0;
-        }
-		@Override
-		public void reconfigure(InetSocketAddress addr) {			
-		}
-        @Override
-        public void resetAllConnectionStats() {
-        }
-        @Override
-        public Iterable<Map<String, Object>> getAllConnectionInfo(boolean brief)
{
-            return null;
-        }
-    }
     static Socket[] getSocketPair() throws IOException {
         ServerSocket ss =
             new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
@@ -1006,7 +934,7 @@ public class Zab1_0Test extends ZKTestCase {
 
                 LOG.info("Proposal sent.");
 
-                for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) {
+                for (int i = 0; i < (2 * ZabUtils.SYNC_LIMIT) + 2; i++) {
                     try {
                         ia.readRecord(qp, null);
                         LOG.info("Ping received: " + i);
@@ -1252,27 +1180,6 @@ public class Zab1_0Test extends ZKTestCase {
         });
     }
     
-    private Leader createLeader(File tmpDir, QuorumPeer peer)
-    throws IOException, NoSuchFieldException, IllegalAccessException {
-        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
-        return new Leader(peer, zk);
-    }
-    
-    private Leader createMockLeader(File tmpDir, QuorumPeer peer)
-    throws IOException, NoSuchFieldException, IllegalAccessException {
-        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
-        return new MockLeader(peer, zk);
-    }
-    
-    private LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer)
-            throws IOException, NoSuchFieldException, IllegalAccessException {
-        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
-        peer.setTxnFactory(logFactory);
-        ZKDatabase zkDb = new ZKDatabase(logFactory);
-        LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb);
-        return zk;
-    }
-    
     static class ConversableFollower extends Follower {
 
         ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
@@ -1326,41 +1233,6 @@ public class Zab1_0Test extends ZKTestCase {
         return new ConversableObserver(peer, zk);
     }
 
-    private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException
{
-        HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
-        QuorumPeer peer = QuorumPeer.testingQuorumPeer();
-        peer.syncLimit = SYNC_LIMIT;
-        peer.initLimit = 2;
-        peer.tickTime = 2000;
-        
-        peers.put(0L, new QuorumServer(
-            0, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-               new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-               new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
-        peers.put(1L, new QuorumServer(
-            1, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-               new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-               new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
-        peers.put(2L, new QuorumServer(
-            2, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-               new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-               new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
-        
-        peer.setQuorumVerifier(new QuorumMaj(peers), false);
-        peer.setCnxnFactory(new NullServerCnxnFactory());
-        File version2 = new File(tmpDir, "version-2");
-        version2.mkdir();
-        ClientBase.createInitializeFile(tmpDir);
-        FileOutputStream fos;
-        fos = new FileOutputStream(new File(version2, "currentEpoch"));
-        fos.write("0\n".getBytes());
-        fos.close();
-        fos = new FileOutputStream(new File(version2, "acceptedEpoch"));
-        fos.write("0\n".getBytes());
-        fos.close();
-        return peer;
-    }
-
     private String readContentsOfFile(File f) throws IOException {
         return new BufferedReader(new FileReader(f)).readLine();
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/088dfdf1/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java b/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java
new file mode 100644
index 0000000..a735275
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.test.ClientBase;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ZabUtils {
+
+    private ZabUtils() {}
+
+    public static final int SYNC_LIMIT = 2;
+
+    public static QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException
{
+        HashMap<Long, QuorumPeer.QuorumServer> peers = new HashMap<Long, QuorumPeer.QuorumServer>();
+        QuorumPeer peer = QuorumPeer.testingQuorumPeer();
+        peer.syncLimit = SYNC_LIMIT;
+        peer.initLimit = 2;
+        peer.tickTime = 2000;
+
+        peers.put(0L, new QuorumPeer.QuorumServer(
+                0, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
+        peers.put(1L, new QuorumPeer.QuorumServer(
+                1, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
+        peers.put(2L, new QuorumPeer.QuorumServer(
+                2, new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
+                new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
+
+        peer.setQuorumVerifier(new QuorumMaj(peers), false);
+        peer.setCnxnFactory(new NullServerCnxnFactory());
+        File version2 = new File(tmpDir, "version-2");
+        version2.mkdir();
+        ClientBase.createInitializeFile(tmpDir);
+        FileOutputStream fos = new FileOutputStream(new File(version2, "currentEpoch"));
+        fos.write("0\n".getBytes());
+        fos.close();
+        fos = new FileOutputStream(new File(version2, "acceptedEpoch"));
+        fos.write("0\n".getBytes());
+        fos.close();
+        return peer;
+    }
+
+    public static Leader createLeader(File tmpDir, QuorumPeer peer)
+            throws IOException, NoSuchFieldException, IllegalAccessException {
+        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
+        return new Leader(peer, zk);
+    }
+
+    public static Leader createMockLeader(File tmpDir, QuorumPeer peer)
+            throws IOException, NoSuchFieldException, IllegalAccessException {
+        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
+        return new MockLeader(peer, zk);
+    }
+
+    private static LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer)
+            throws IOException, NoSuchFieldException, IllegalAccessException {
+        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+        peer.setTxnFactory(logFactory);
+        ZKDatabase zkDb = new ZKDatabase(logFactory);
+        LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb);
+        return zk;
+    }
+
+    private static final class NullServerCnxnFactory extends ServerCnxnFactory {
+        public void startup(ZooKeeperServer zkServer, boolean startServer)
+                throws IOException, InterruptedException {
+        }
+        public void start() {
+        }
+        public void shutdown() {
+        }
+        public void setMaxClientCnxnsPerHost(int max) {
+        }
+        public void join() throws InterruptedException {
+        }
+        public int getMaxClientCnxnsPerHost() {
+            return 0;
+        }
+        public int getLocalPort() {
+            return 0;
+        }
+        public InetSocketAddress getLocalAddress() {
+            return null;
+        }
+        public Iterable<ServerCnxn> getConnections() {
+            return null;
+        }
+        public void configure(InetSocketAddress addr, int maxcc, boolean secure)
+                throws IOException {
+        }
+
+        public boolean closeSession(long sessionId) {
+            return false;
+        }
+        public void closeAll() {
+        }
+        @Override
+        public int getNumAliveConnections() {
+            return 0;
+        }
+        @Override
+        public void reconfigure(InetSocketAddress addr) {
+        }
+        @Override
+        public void resetAllConnectionStats() {
+        }
+        @Override
+        public Iterable<Map<String, Object>> getAllConnectionInfo(boolean brief)
{
+            return null;
+        }
+    }
+
+    public static final class MockLeader extends Leader {
+
+        MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk)
+                throws IOException {
+            super(qp, zk);
+        }
+
+        /**
+         * This method returns the value of the variable that holds the epoch
+         * to be proposed and that has been proposed, depending on the point
+         * of the execution in which it is called.
+         *
+         * @return epoch
+         */
+        public long getCurrentEpochToPropose() {
+            return epoch;
+        }
+    }
+}


Mime
View raw message