zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cami...@apache.org
Subject svn commit: r1190352 - in /zookeeper/branches/branch-3.4: CHANGES.txt src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java src/java/test/org/apache/zookeeper/test/QuorumTest.java
Date Fri, 28 Oct 2011 14:29:03 GMT
Author: camille
Date: Fri Oct 28 14:29:03 2011
New Revision: 1190352

URL: http://svn.apache.org/viewvc?rev=1190352&view=rev
Log:
ZOOKEEPER-1264: FollowerResyncConcurrencyTest failing intermittently (phunt via camille)

Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1190352&r1=1190351&r2=1190352&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Fri Oct 28 14:29:03 2011
@@ -347,6 +347,8 @@ BUGFIXES: 
 
   ZOOKEEPER-1181. Fix problems with Kerberos TGT renewal.
   (Eugene Koontz via mahadev)
+  
+  ZOOKEEPER-1264. FollowerResyncConcurrencyTest failing intermittently. (phunt via camille)
 
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1190352&r1=1190351&r2=1190352&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
Fri Oct 28 14:29:03 2011
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.Logger;
@@ -43,12 +44,11 @@ import org.junit.Test;
 
 
 public class FollowerResyncConcurrencyTest extends ZKTestCase {
-    volatile int counter = 0;
-    volatile int errors = 0; 
-
     private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
     public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
 
+    private volatile int counter = 0;
+    private volatile int errors = 0; 
 
     /**
      * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
@@ -64,8 +64,9 @@ public class FollowerResyncConcurrencyTe
      * @throws KeeperException
      */
     @Test
-    public void testResyncBySnapThenDiffAfterFollowerCrashes () 
-    throws IOException, InterruptedException, KeeperException,  Throwable{
+    public void testResyncBySnapThenDiffAfterFollowerCrashes() 
+        throws IOException, InterruptedException, KeeperException,  Throwable
+    {
         final Semaphore sem = new Semaphore(0);
 
         QuorumUtil qu = new QuorumUtil(1);
@@ -75,31 +76,36 @@ public class FollowerResyncConcurrencyTe
         CountdownWatcher watcher3 = new CountdownWatcher();
 
         int index = 1;
-        while(qu.getPeer(index).peer.leader == null)
+        while(qu.getPeer(index).peer.leader == null) {
             index++;
+        }
 
         Leader leader = qu.getPeer(index).peer.leader;
-
         assertNotNull(leader);    
-        /*
-         * Reusing the index variable to select a follower to connect to
-         */
+
+        /* Reusing the index variable to select a follower to connect to */
         index = (index == 1) ? 2 : 1;
+        LOG.info("Connecting to follower:" + index);
+
         qu.shutdown(index);
-        final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(),
1000,watcher3);
-        watcher3.waitForConnected(CONNECTION_TIMEOUT);
+
+        final ZooKeeper zk3 =
+            createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
+        LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
+
         zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 
         qu.restart(index);
-        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000, watcher1);
-
-        ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000, watcher2);
-    
-        watcher1.waitForConnected(CONNECTION_TIMEOUT);
-        watcher2.waitForConnected(CONNECTION_TIMEOUT);
+        final ZooKeeper zk1 =
+            createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
+        LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
+
+        final ZooKeeper zk2 =
+            createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
+        LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
         
-        zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     
-        Thread t = new Thread(new Runnable() {
+        zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Thread mytestfooThread = new Thread(new Runnable() {
 
             @Override
             public void run() {
@@ -115,8 +121,6 @@ public class FollowerResyncConcurrencyTe
                             if(counter == 14200){
                                 sem.release();
                             }
-
-
                         }
                     }, null);
                     if(i%10==0){
@@ -131,7 +135,6 @@ public class FollowerResyncConcurrencyTe
             }
         });
 
-        
         for(int i = 0; i < 13000; i++) {
             zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
 
@@ -144,8 +147,6 @@ public class FollowerResyncConcurrencyTe
                     if(counter == 14200){
                         sem.release();
                     }
-
-
                 }
             }, null);            
 
@@ -158,7 +159,7 @@ public class FollowerResyncConcurrencyTe
                 qu.restart(index);       
                 Thread.sleep(300);
                 qu.shutdown(index);
-                t.start();
+                mytestfooThread.start();
                 Thread.sleep(300);                
                 qu.restart(index);
                 LOG.info("Setting up server: " + index);
@@ -169,7 +170,6 @@ public class FollowerResyncConcurrencyTe
 
             if(i%50 == 0) {
                 zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
-
                     @Override
                     public void processResult(int rc, String path, Object ctx, String name)
{
                         counter++;
@@ -179,22 +179,28 @@ public class FollowerResyncConcurrencyTe
                         if(counter == 14200){
                             sem.release();
                         }
-
-
                     }
                 }, null);
             }
         }
 
         // Wait until all updates return
-        if(!sem.tryAcquire(20000, TimeUnit.MILLISECONDS)) {
+        if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
             LOG.warn("Did not aquire semaphore fast enough");
         }
-        t.join(10000);
+        mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
+        if (mytestfooThread.isAlive()) {
+            LOG.error("mytestfooThread is still alive");
+        }
         Thread.sleep(1000);
         
-            verifyState(qu, index, leader);
+        verifyState(qu, index, leader);
         
+        zk1.close();
+        zk2.close();
+        zk3.close();
+        
+        qu.shutdownAll();
     }      
     
     /**
@@ -222,8 +228,9 @@ public class FollowerResyncConcurrencyTe
      */
 
     @Test
-    public void testResyncByDiffAfterFollowerCrashes () 
-    throws IOException, InterruptedException, KeeperException, Throwable{
+    public void testResyncByDiffAfterFollowerCrashes() 
+        throws IOException, InterruptedException, KeeperException, Throwable
+    {
         final Semaphore sem = new Semaphore(0);
 
         QuorumUtil qu = new QuorumUtil(1);
@@ -232,34 +239,35 @@ public class FollowerResyncConcurrencyTe
         CountdownWatcher watcher2 = new CountdownWatcher();
         CountdownWatcher watcher3 = new CountdownWatcher();
 
-
         int index = 1;
-        while(qu.getPeer(index).peer.leader == null)
+        while(qu.getPeer(index).peer.leader == null) {
             index++;
+        }
 
         Leader leader = qu.getPeer(index).peer.leader;
-
         assertNotNull(leader);
 
-        /*
-         * Reusing the index variable to select a follower to connect to
-         */
+        /* Reusing the index variable to select a follower to connect to */
         index = (index == 1) ? 2 : 1;
+        LOG.info("Connecting to follower:" + index);
+
+        final ZooKeeper zk1 =
+            createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
+        LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
 
-        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000, watcher1);
+        final ZooKeeper zk2 =
+            createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
+        LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
 
-        ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000,watcher2);
-        final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(),
1000, watcher3);
-        watcher1.waitForConnected(CONNECTION_TIMEOUT);
-        watcher2.waitForConnected(CONNECTION_TIMEOUT);
-        watcher3.waitForConnected(CONNECTION_TIMEOUT);
-        zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     
+        final ZooKeeper zk3 =
+            createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
+        LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
+
+        zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 
-        
         final AtomicBoolean runNow = new AtomicBoolean(false);
-        Thread t = new Thread(new Runnable() {
-
+        Thread mytestfooThread = new Thread(new Runnable() {
             @Override
             public void run() {                                
                 int inSyncCounter = 0;
@@ -276,8 +284,6 @@ public class FollowerResyncConcurrencyTe
                                 if(counter > 7300){
                                     sem.release();
                                 }
-
-
                             }
                         }, null);
                         
@@ -286,8 +292,7 @@ public class FollowerResyncConcurrencyTe
                         } catch (Exception e) {
                         }
                         inSyncCounter++;
-                    }
-                    else {
+                    } else {
                         Thread.yield();
                     }
                 }
@@ -295,7 +300,7 @@ public class FollowerResyncConcurrencyTe
             }
         });
 
-        t.start();
+        mytestfooThread.start();
         for(int i = 0; i < 5000; i++) {
             zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
 
@@ -308,8 +313,6 @@ public class FollowerResyncConcurrencyTe
                     if(counter > 7300){
                         sem.release();
                     }
-
-
                 }
             }, null);            
 
@@ -317,7 +320,6 @@ public class FollowerResyncConcurrencyTe
                 qu.shutdown(index);      
                 Thread.sleep(1100);
                 LOG.info("Shutting down s1");
-
             }
             if(i == 1100 || i == 1150 || i == 1200) {
                 Thread.sleep(1000);
@@ -330,7 +332,6 @@ public class FollowerResyncConcurrencyTe
                 LOG.info("Setting up server: " + index);
             }
         
-
             if(i>=1000 &&  i%2== 0) {
                 zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
 
@@ -343,8 +344,6 @@ public class FollowerResyncConcurrencyTe
                         if(counter > 7300){
                             sem.release();
                         }
-
-
                     }
                 }, null);
             }
@@ -354,15 +353,35 @@ public class FollowerResyncConcurrencyTe
         }
 
         // Wait until all updates return
-        if(!sem.tryAcquire(15000, TimeUnit.MILLISECONDS)) {
+        if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
             LOG.warn("Did not aquire semaphore fast enough");
         }
-        t.join(10000);
+        mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
+        if (mytestfooThread.isAlive()) {
+            LOG.error("mytestfooThread is still alive");
+        }
+
         Thread.sleep(1000);
         // Verify that server is following and has the same epoch as the leader
         
         verifyState(qu, index, leader);
         
+        zk1.close();
+        zk2.close();
+        zk3.close();
+        
+        qu.shutdownAll();
+    }
+
+    private static DisconnectableZooKeeper createClient(int port,
+            CountdownWatcher watcher)
+        throws IOException, TimeoutException, InterruptedException
+    {
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+                "127.0.0.1:" + port, ClientBase.CONNECTION_TIMEOUT, watcher);
+
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        return zk;
     }
 
     private void verifyState(QuorumUtil qu, int index, Leader leader) {

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1190352&r1=1190351&r2=1190352&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Fri Oct 28 14:29:03 2011
@@ -378,9 +378,11 @@ public class QuorumTest extends ZKTestCa
          */
         index = (index == 1) ? 2 : 1;
         
-        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000, new Watcher() {
-            public void process(WatchedEvent event) {
-        }});
+        ZooKeeper zk = new DisconnectableZooKeeper(
+                "127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
+                ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+            public void process(WatchedEvent event) { }
+          });
 
         zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     
         
@@ -412,7 +414,7 @@ public class QuorumTest extends ZKTestCa
         }
 
         // Wait until all updates return
-        sem.tryAcquire(15000, TimeUnit.MILLISECONDS);
+        sem.tryAcquire(15, TimeUnit.SECONDS);
         
         // Verify that server is following and has the same epoch as the leader
         Assert.assertTrue("Not following", qu.getPeer(index).peer.follower != null);



Mime
View raw message