zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cami...@apache.org
Subject svn commit: r1198053 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Sat, 05 Nov 2011 20:57:13 GMT
Author: camille
Date: Sat Nov  5 20:57:13 2011
New Revision: 1198053

URL: http://svn.apache.org/viewvc?rev=1198053&view=rev
Log:
ZOOKEEPER-1264. FollowerResyncConcurrencyTest failing intermittently.
ZOOKEEPER-1282. Learner.java not following Zab 1.0 protocol - setCurrentEpoch should be done
upon receipt of NEWLEADER (before acking it) and not upon receipt of UPTODATE.
ZOOKEEPER-1291. AcceptedEpoch not updated at leader before it proposes the epoch to followers.

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1198053&r1=1198052&r2=1198053&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sat Nov  5 20:57:13 2011
@@ -44,6 +44,15 @@ BUGFIXES:
 
   ZOOKEEPER-1271. testEarlyLeaderAbandonment failing on solaris -
   clients not retrying connection (mahadev via phunt)
+  
+  ZOOKEEPER-1264. FollowerResyncConcurrencyTest failing 
+  intermittently. (breed, camille and Alex Shraer via camille)
+  
+  ZOOKEEPER-1282. Learner.java not following Zab 1.0 protocol - 
+  setCurrentEpoch should be done upon receipt of NEWLEADER 
+  (before acking it) and not upon receipt of UPTODATE (breed via camille)
+  
+  ZOOKEEPER-1291. AcceptedEpoch not updated at leader before it proposes the epoch to followers.
(Alex Shraer via camille)
 
 IMPROVEMENTS:
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1198053&r1=1198052&r2=1198053&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Sat Nov 
5 20:57:13 2011
@@ -309,8 +309,7 @@ public class Leader {
             cnxAcceptor.start();
 
             readyToStart = true;
-            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
-            self.setAcceptedEpoch(epoch);
+            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());      
     
 
             zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
 
@@ -764,7 +763,7 @@ public class Leader {
     }
 
     private final HashSet<Long> connectingFollowers = new HashSet<Long>();
-	public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException
{
+	public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException,
IOException {
 		synchronized(connectingFollowers) {
 			if (!waitingForNewEpoch) {
 				return epoch;
@@ -777,6 +776,7 @@ public class Leader {
 			if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers))

 {
 				waitingForNewEpoch = false;
+				self.setAcceptedEpoch(epoch);
 				connectingFollowers.notifyAll();
 			} else {
                    long start = System.currentTimeMillis();

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1198053&r1=1198052&r2=1198053&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Sat Nov
 5 20:57:13 2011
@@ -312,8 +312,10 @@ public class Learner {       
     protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
         QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
         QuorumPacket qp = new QuorumPacket();
+        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
         
         readPacket(qp);   
+        LinkedList<Long> packetsCommitted = new LinkedList<Long>();
         LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
@@ -376,12 +378,16 @@ public class Learner {       
                     packetsNotCommitted.add(pif);
                     break;
                 case Leader.COMMIT:
-                    pif = packetsNotCommitted.peekFirst();
-                    if (pif.hdr.getZxid() != qp.getZxid()) {
-                        LOG.warn("Committing " + qp.getZxid() + ", but next proposal is "
+ pif.hdr.getZxid());
+                    if (!snapshotTaken) {
+                        pif = packetsNotCommitted.peekFirst();
+                        if (pif.hdr.getZxid() != qp.getZxid()) {
+                            LOG.warn("Committing " + qp.getZxid() + ", but next proposal
is " + pif.hdr.getZxid());
+                        } else {
+                            zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+                            packetsNotCommitted.remove();
+                        }
                     } else {
-                        zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
-                        packetsNotCommitted.remove();
+                        packetsCommitted.add(qp.getZxid());
                     }
                     break;
                 case Leader.INFORM:
@@ -390,28 +396,34 @@ public class Learner {       
                     zk.getZKDatabase().processTxn(hdr, txn);
                     break;
                 case Leader.UPTODATE:
-                    if (!snapshotTaken) {
+                    if (!snapshotTaken) { // true for the pre v1.0 case
                         zk.takeSnapshot();
+                        self.setCurrentEpoch(newEpoch);
                     }
                     self.cnxnFactory.setZooKeeperServer(zk);                
                     break outerLoop;
                 case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
                     zk.takeSnapshot();
+                    self.setCurrentEpoch(newEpoch);
                     snapshotTaken = true;
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null),
true);
                     break;
                 }
             }
         }
-        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
-        self.setCurrentEpoch(newEpoch);
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
         sock.setSoTimeout(self.tickTime * self.syncLimit);
         zk.startup();
-        //We have to have a commit processor to do this
-        for(PacketInFlight p: packetsNotCommitted) {
-            ((FollowerZooKeeperServer)zk).logRequest(p.hdr, p.rec);
+        // We need to log the stuff that came in between the snapshot and the uptodate
+        if (zk instanceof FollowerZooKeeperServer) {
+            FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
+            for(PacketInFlight p: packetsNotCommitted) {
+                fzk.logRequest(p.hdr, p.rec);
+            }
+            for(Long zxid: packetsCommitted) {
+                fzk.commit(zxid);
+            }
         }
     }
     

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1198053&r1=1198052&r2=1198053&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Sat Nov
 5 20:57:13 2011
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -33,19 +34,24 @@ import org.apache.jute.BinaryInputArchiv
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.ByteBufferOutputStream;
-import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.Leader;
-import org.apache.zookeeper.server.quorum.LearnerInfo;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -226,10 +232,10 @@ public class Zab1_0Test {
     }
     
     static public interface FollowerConversation {
-        void converseWithFollower(InputArchive ia, OutputArchive oa) throws Exception;
+        void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
     }
     
-    public void testConversation(LeaderConversation conversation) throws Exception {
+    public void testLeaderConversation(LeaderConversation conversation) throws Exception
{
         Socket pair[] = getSocketPair();
         Socket leaderSocket = pair[0];
         Socket followerSocket = pair[1];
@@ -270,12 +276,215 @@ public class Zab1_0Test {
             }
         }
     }
-        
+    
+    public void testFollowerConversation(FollowerConversation conversation) throws Exception
{
+        File tmpDir = File.createTempFile("test", "dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        Thread followerThread = null;
+        ConversableFollower follower = null;
+        QuorumPeer peer = null;
+        try {
+            peer = createQuorumPeer(tmpDir);
+            follower = createFollower(tmpDir, peer);
+            peer.follower = follower;
+            
+            ServerSocket ss = new ServerSocket();
+            ss.bind(null);
+            follower.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+            final Follower followerForThread = follower;
+            
+            followerThread = new Thread() {
+                public void run() {
+                    try {
+                        followerForThread.followLeader();
+                    } catch(Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            followerThread.start();
+            Socket leaderSocket = ss.accept();
+            
+            InputArchive ia = BinaryInputArchive.getArchive(leaderSocket
+                    .getInputStream());
+            OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket
+                    .getOutputStream());
+
+            conversation.converseWithFollower(ia, oa, follower);
+        } finally {
+            if (follower != null) {
+                follower.shutdown();
+            }
+            if (followerThread != null) {
+                followerThread.interrupt();
+                followerThread.join();
+            }
+            if (peer != null) {
+                peer.shutdown();
+            }
+            recursiveDelete(tmpDir);
+        }
+    }
+
+    @Test
+    public void testNormalFollowerRun() throws Exception {
+        testFollowerConversation(new FollowerConversation() {
+            @Override
+            public void converseWithFollower(InputArchive ia, OutputArchive oa,
+                    Follower f) throws Exception {
+                File tmpDir = File.createTempFile("test", "dir");
+                tmpDir.delete();
+                tmpDir.mkdir();
+                File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
+                File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                try {
+                    Assert.assertEquals(0, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Setup a database with a single /foo node
+                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
+                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create),
new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+                    Stat stat = new Stat();
+                    Assert.assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
+
+                    QuorumPacket qp = new QuorumPacket();
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.FOLLOWERINFO, qp.getType());
+                    Assert.assertEquals(qp.getZxid(), 0);
+                    LearnerInfo learnInfo = new LearnerInfo();
+                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()),
learnInfo);
+                    Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+                    Assert.assertEquals(learnInfo.getServerid(), 0);
+                
+                    // We are simulating an established leader, so the epoch is 1
+                    qp.setType(Leader.LEADERINFO);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    byte protoBytes[] = new byte[4];
+                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
+                    qp.setData(protoBytes);
+                    oa.writeRecord(qp, null);
+                
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
+                    Assert.assertEquals(0, qp.getZxid());
+                    Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
+                    Assert.assertEquals(1, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(0, f.self.getCurrentEpoch());
+                    
+                    // Send the snapshot we created earlier
+                    qp.setType(Leader.SNAP);
+                    qp.setData(new byte[0]);
+                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+                    oa.writeRecord(qp, null);
+                    zkDb.serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", null);
+                    qp.setType(Leader.NEWLEADER);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    oa.writeRecord(qp, null);
+
+                    // Get the ack of the new leader
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    Assert.assertEquals(1, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(1, f.self.getCurrentEpoch());
+                    
+                    Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
+                    
+                    // Make sure the data was recorded in the filesystem ok
+                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    long lastZxid = zkDb2.loadDataBase();
+                    Assert.assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
+                    Assert.assertEquals(firstZxid, lastZxid);
+
+                    // Propose an update
+                    long proposalZxid = ZxidUtils.makeZxid(1, 1000);
+                    proposeSetData(qp, proposalZxid, "data2", 2);
+                    oa.writeRecord(qp, null);
+                    
+                    // We want to track the change with a callback rather than depending
on timing
+                    class TrackerWatcher implements Watcher {
+                        boolean changed;
+                        synchronized void waitForChange() throws InterruptedException {
+                            while(!changed) {
+                                wait();
+                            }
+                        }
+                        @Override
+                        public void process(WatchedEvent event) {
+                            if (event.getType() == EventType.NodeDataChanged) {
+                                synchronized(this) {
+                                    changed = true;
+                                    notifyAll();
+                                }
+                            }
+                        }
+                        synchronized public boolean changed() {
+                            return changed;
+                        }
+                        
+                    };
+                    TrackerWatcher watcher = new TrackerWatcher();
+                    
+                    // The change should not have happened yet, since we haven't committed
+                    Assert.assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo",
stat, watcher)));
+                    
+                    // The change should happen now
+                    qp.setType(Leader.COMMIT);
+                    qp.setZxid(proposalZxid);
+                    oa.writeRecord(qp, null);
+                    
+                    qp.setType(Leader.UPTODATE);
+                    qp.setZxid(0);
+                    oa.writeRecord(qp, null);
+                    
+                    // Read the uptodate ack
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(proposalZxid, qp.getZxid());
+                    
+                    watcher.waitForChange();
+                    Assert.assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo",
stat, null)));
+                    
+                    // check and make sure the change is persisted
+                    zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    lastZxid = zkDb2.loadDataBase();
+                    Assert.assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
+                    Assert.assertEquals(proposalZxid, lastZxid);
+                } finally {
+                    recursiveDelete(tmpDir);
+                }
+                
+            }
+
+            private void proposeSetData(QuorumPacket qp, long zxid, String data, int version)
throws IOException {
+                qp.setType(Leader.PROPOSAL);
+                qp.setZxid(zxid);
+                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
+                SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                boa.writeRecord(hdr, null);
+                boa.writeRecord(sdt, null);
+                qp.setData(baos.toByteArray());
+            }
+        });
+    }
+    
     @Test
     public void testNormalRun() throws Exception {
-        testConversation(new LeaderConversation() {
+        testLeaderConversation(new LeaderConversation() {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException {
+                Assert.assertEquals(0, l.self.getAcceptedEpoch());
+                Assert.assertEquals(0, l.self.getCurrentEpoch());
+                
                 /* we test a normal run. everything should work out well. */
                 LearnerInfo li = new LearnerInfo(1, 0x10000);
                 byte liBytes[] = new byte[12];
@@ -284,20 +493,30 @@ public class Zab1_0Test {
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
                         liBytes, null);
                 oa.writeRecord(qp, null);
+                
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.LEADERINFO, qp.getType());
                 Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                 Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
                         0x10000);
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(0, l.self.getCurrentEpoch());
+                
                 qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
                 oa.writeRecord(qp, null);
+                
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.DIFF, qp.getType());
+               
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.NEWLEADER, qp.getType());
                 Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(1, l.self.getCurrentEpoch());
+                
                 qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
                 oa.writeRecord(qp, null);
+
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.UPTODATE, qp.getType());
             }
@@ -306,7 +525,7 @@ public class Zab1_0Test {
     
     @Test
     public void testLeaderBehind() throws Exception {
-        testConversation(new LeaderConversation() {
+        testLeaderConversation(new LeaderConversation() {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException {
                 /* we test a normal run. everything should work out well. */
@@ -346,7 +565,7 @@ public class Zab1_0Test {
      */
     @Test
     public void testAbandonBeforeACKEpoch() throws Exception {
-        testConversation(new LeaderConversation() {
+        testLeaderConversation(new LeaderConversation() {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException, InterruptedException {
                 /* we test a normal run. everything should work out well. */            

@@ -392,6 +611,33 @@ public class Zab1_0Test {
         LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb);
         return new Leader(peer, zk);
     }
+
+    static class ConversableFollower extends Follower {
+
+        ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
+            super(self, zk);
+        }
+
+        InetSocketAddress leaderAddr;
+        public void setLeaderSocketAddress(InetSocketAddress addr) {
+            leaderAddr = addr;
+        }
+        
+        @Override
+        protected InetSocketAddress findLeader() {
+            return leaderAddr;
+        }
+    }
+    private ConversableFollower createFollower(File tmpDir, QuorumPeer peer)
+    throws IOException {
+        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+        peer.setTxnFactory(logFactory);
+        ZKDatabase zkDb = new ZKDatabase(logFactory);
+        FollowerZooKeeperServer zk = new FollowerZooKeeperServer(logFactory, peer, zkDb);
+        peer.setZKDatabase(zkDb);
+        return new ConversableFollower(peer, zk);
+    }
+
     private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
             FileNotFoundException {
         QuorumPeer peer = new QuorumPeer();

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1198053&r1=1198052&r2=1198053&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
(original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
Sat Nov  5 20:57:13 2011
@@ -108,13 +108,13 @@ public class FollowerResyncConcurrencyTe
         Thread mytestfooThread = new Thread(new Runnable() {
             @Override
             public void run() {
-                for(int i = 0; i < 1000; i++) {
+                for(int i = 0; i < 3000; i++) {
                     zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
 
                         @Override
                         public void processResult(int rc, String path, Object ctx, String
name) {
                             counter++;
-                            if(counter == 14200){
+                            if(counter == 16200){
                                 sem.release();
                             }
                         }
@@ -137,7 +137,7 @@ public class FollowerResyncConcurrencyTe
                 @Override
                 public void processResult(int rc, String path, Object ctx, String name) {
                     counter++;
-                    if(counter == 14200){
+                    if(counter == 16200){
                         sem.release();
                     }
                 }
@@ -149,10 +149,10 @@ public class FollowerResyncConcurrencyTe
             }
             if(i == 12000){
                 //Restart off of snap, then get some txns for a log, then shut down
-                qu.restart(index);
-                Thread.sleep(300);
-                qu.shutdown(index);
                 mytestfooThread.start();
+                qu.restart(index);
+                Thread.sleep(300);                
+                qu.shutdown(index);                
                 Thread.sleep(300);
                 qu.restart(index);
                 LOG.info("Setting up server: " + index);
@@ -166,7 +166,7 @@ public class FollowerResyncConcurrencyTe
                     @Override
                     public void processResult(int rc, String path, Object ctx, String name)
{
                         counter++;
-                        if(counter == 14200){
+                        if(counter == 16200){
                             sem.release();
                         }
                     }



Mime
View raw message