Author: camille
Date: Fri Oct 28 14:29:03 2011
New Revision: 1190352
URL: http://svn.apache.org/viewvc?rev=1190352&view=rev
Log:
ZOOKEEPER-1264: FollowerResyncConcurrencyTest failing intermittently (phunt via camille)
Modified:
zookeeper/branches/branch-3.4/CHANGES.txt
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1190352&r1=1190351&r2=1190352&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Fri Oct 28 14:29:03 2011
@@ -347,6 +347,8 @@ BUGFIXES:
ZOOKEEPER-1181. Fix problems with Kerberos TGT renewal.
(Eugene Koontz via mahadev)
+
+ ZOOKEEPER-1264. FollowerResyncConcurrencyTest failing intermittently. (phunt via camille)
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1190352&r1=1190351&r2=1190352&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
Fri Oct 28 14:29:03 2011
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
@@ -43,12 +44,11 @@ import org.junit.Test;
public class FollowerResyncConcurrencyTest extends ZKTestCase {
- volatile int counter = 0;
- volatile int errors = 0;
-
private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+ private volatile int counter = 0;
+ private volatile int errors = 0;
/**
* See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
@@ -64,8 +64,9 @@ public class FollowerResyncConcurrencyTe
* @throws KeeperException
*/
@Test
- public void testResyncBySnapThenDiffAfterFollowerCrashes ()
- throws IOException, InterruptedException, KeeperException, Throwable{
+ public void testResyncBySnapThenDiffAfterFollowerCrashes()
+ throws IOException, InterruptedException, KeeperException, Throwable
+ {
final Semaphore sem = new Semaphore(0);
QuorumUtil qu = new QuorumUtil(1);
@@ -75,31 +76,36 @@ public class FollowerResyncConcurrencyTe
CountdownWatcher watcher3 = new CountdownWatcher();
int index = 1;
- while(qu.getPeer(index).peer.leader == null)
+ while(qu.getPeer(index).peer.leader == null) {
index++;
+ }
Leader leader = qu.getPeer(index).peer.leader;
-
assertNotNull(leader);
- /*
- * Reusing the index variable to select a follower to connect to
- */
+
+ /* Reusing the index variable to select a follower to connect to */
index = (index == 1) ? 2 : 1;
+ LOG.info("Connecting to follower:" + index);
+
qu.shutdown(index);
- final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(),
1000,watcher3);
- watcher3.waitForConnected(CONNECTION_TIMEOUT);
+
+ final ZooKeeper zk3 =
+ createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
+ LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
+
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
qu.restart(index);
- ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000, watcher1);
-
- ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000, watcher2);
-
- watcher1.waitForConnected(CONNECTION_TIMEOUT);
- watcher2.waitForConnected(CONNECTION_TIMEOUT);
+ final ZooKeeper zk1 =
+ createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
+ LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
+
+ final ZooKeeper zk2 =
+ createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
+ LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
- zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- Thread t = new Thread(new Runnable() {
+ zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Thread mytestfooThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -115,8 +121,6 @@ public class FollowerResyncConcurrencyTe
if(counter == 14200){
sem.release();
}
-
-
}
}, null);
if(i%10==0){
@@ -131,7 +135,6 @@ public class FollowerResyncConcurrencyTe
}
});
-
for(int i = 0; i < 13000; i++) {
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
@@ -144,8 +147,6 @@ public class FollowerResyncConcurrencyTe
if(counter == 14200){
sem.release();
}
-
-
}
}, null);
@@ -158,7 +159,7 @@ public class FollowerResyncConcurrencyTe
qu.restart(index);
Thread.sleep(300);
qu.shutdown(index);
- t.start();
+ mytestfooThread.start();
Thread.sleep(300);
qu.restart(index);
LOG.info("Setting up server: " + index);
@@ -169,7 +170,6 @@ public class FollowerResyncConcurrencyTe
if(i%50 == 0) {
zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
-
@Override
public void processResult(int rc, String path, Object ctx, String name)
{
counter++;
@@ -179,22 +179,28 @@ public class FollowerResyncConcurrencyTe
if(counter == 14200){
sem.release();
}
-
-
}
}, null);
}
}
// Wait until all updates return
- if(!sem.tryAcquire(20000, TimeUnit.MILLISECONDS)) {
+ if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
LOG.warn("Did not aquire semaphore fast enough");
}
- t.join(10000);
+ mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
+ if (mytestfooThread.isAlive()) {
+ LOG.error("mytestfooThread is still alive");
+ }
Thread.sleep(1000);
- verifyState(qu, index, leader);
+ verifyState(qu, index, leader);
+ zk1.close();
+ zk2.close();
+ zk3.close();
+
+ qu.shutdownAll();
}
/**
@@ -222,8 +228,9 @@ public class FollowerResyncConcurrencyTe
*/
@Test
- public void testResyncByDiffAfterFollowerCrashes ()
- throws IOException, InterruptedException, KeeperException, Throwable{
+ public void testResyncByDiffAfterFollowerCrashes()
+ throws IOException, InterruptedException, KeeperException, Throwable
+ {
final Semaphore sem = new Semaphore(0);
QuorumUtil qu = new QuorumUtil(1);
@@ -232,34 +239,35 @@ public class FollowerResyncConcurrencyTe
CountdownWatcher watcher2 = new CountdownWatcher();
CountdownWatcher watcher3 = new CountdownWatcher();
-
int index = 1;
- while(qu.getPeer(index).peer.leader == null)
+ while(qu.getPeer(index).peer.leader == null) {
index++;
+ }
Leader leader = qu.getPeer(index).peer.leader;
-
assertNotNull(leader);
- /*
- * Reusing the index variable to select a follower to connect to
- */
+ /* Reusing the index variable to select a follower to connect to */
index = (index == 1) ? 2 : 1;
+ LOG.info("Connecting to follower:" + index);
+
+ final ZooKeeper zk1 =
+ createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
+ LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
- ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000, watcher1);
+ final ZooKeeper zk2 =
+ createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
+ LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
- ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000,watcher2);
- final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(),
1000, watcher3);
- watcher1.waitForConnected(CONNECTION_TIMEOUT);
- watcher2.waitForConnected(CONNECTION_TIMEOUT);
- watcher3.waitForConnected(CONNECTION_TIMEOUT);
- zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ final ZooKeeper zk3 =
+ createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
+ LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
+
+ zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
final AtomicBoolean runNow = new AtomicBoolean(false);
- Thread t = new Thread(new Runnable() {
-
+ Thread mytestfooThread = new Thread(new Runnable() {
@Override
public void run() {
int inSyncCounter = 0;
@@ -276,8 +284,6 @@ public class FollowerResyncConcurrencyTe
if(counter > 7300){
sem.release();
}
-
-
}
}, null);
@@ -286,8 +292,7 @@ public class FollowerResyncConcurrencyTe
} catch (Exception e) {
}
inSyncCounter++;
- }
- else {
+ } else {
Thread.yield();
}
}
@@ -295,7 +300,7 @@ public class FollowerResyncConcurrencyTe
}
});
- t.start();
+ mytestfooThread.start();
for(int i = 0; i < 5000; i++) {
zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
@@ -308,8 +313,6 @@ public class FollowerResyncConcurrencyTe
if(counter > 7300){
sem.release();
}
-
-
}
}, null);
@@ -317,7 +320,6 @@ public class FollowerResyncConcurrencyTe
qu.shutdown(index);
Thread.sleep(1100);
LOG.info("Shutting down s1");
-
}
if(i == 1100 || i == 1150 || i == 1200) {
Thread.sleep(1000);
@@ -330,7 +332,6 @@ public class FollowerResyncConcurrencyTe
LOG.info("Setting up server: " + index);
}
-
if(i>=1000 && i%2== 0) {
zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
@@ -343,8 +344,6 @@ public class FollowerResyncConcurrencyTe
if(counter > 7300){
sem.release();
}
-
-
}
}, null);
}
@@ -354,15 +353,35 @@ public class FollowerResyncConcurrencyTe
}
// Wait until all updates return
- if(!sem.tryAcquire(15000, TimeUnit.MILLISECONDS)) {
+ if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
LOG.warn("Did not aquire semaphore fast enough");
}
- t.join(10000);
+ mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
+ if (mytestfooThread.isAlive()) {
+ LOG.error("mytestfooThread is still alive");
+ }
+
Thread.sleep(1000);
// Verify that server is following and has the same epoch as the leader
verifyState(qu, index, leader);
+ zk1.close();
+ zk2.close();
+ zk3.close();
+
+ qu.shutdownAll();
+ }
+
+ private static DisconnectableZooKeeper createClient(int port,
+ CountdownWatcher watcher)
+ throws IOException, TimeoutException, InterruptedException
+ {
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+ "127.0.0.1:" + port, ClientBase.CONNECTION_TIMEOUT, watcher);
+
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ return zk;
}
private void verifyState(QuorumUtil qu, int index, Leader leader) {
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1190352&r1=1190351&r2=1190352&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Fri Oct 28 14:29:03 2011
@@ -378,9 +378,11 @@ public class QuorumTest extends ZKTestCa
*/
index = (index == 1) ? 2 : 1;
- ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
1000, new Watcher() {
- public void process(WatchedEvent event) {
- }});
+ ZooKeeper zk = new DisconnectableZooKeeper(
+ "127.0.0.1:" + qu.getPeer(index).peer.getClientPort(),
+ ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ public void process(WatchedEvent event) { }
+ });
zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -412,7 +414,7 @@ public class QuorumTest extends ZKTestCa
}
// Wait until all updates return
- sem.tryAcquire(15000, TimeUnit.MILLISECONDS);
+ sem.tryAcquire(15, TimeUnit.SECONDS);
// Verify that server is following and has the same epoch as the leader
Assert.assertTrue("Not following", qu.getPeer(index).peer.follower != null);
|