flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-2616] [test-stability] Fixes ZooKeeperLeaderElectionTest.testMultipleLeaders by introducing a second retrieval service to retrieve the leader address after the faulty address has been written.
Date Thu, 24 Sep 2015 12:57:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0b23b5ea1 -> 0d19e94a1


[FLINK-2616] [test-stability] Fixes ZooKeeperLeaderElectionTest.testMultipleLeaders by introducing
a second retrieval service to retrieve the leader address after the faulty address has been
written.

This closes #1173.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d19e94a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d19e94a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d19e94a

Branch: refs/heads/master
Commit: 0d19e94a18a334b6b5843c3ea81e20af096d7f35
Parents: 0b23b5e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Sep 23 14:34:38 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Sep 24 14:56:11 2015 +0200

----------------------------------------------------------------------
 .../StandaloneLeaderElectionTest.java           |  2 +-
 .../runtime/leaderelection/TestingListener.java | 27 ----------------
 .../ZooKeeperLeaderElectionTest.java            | 33 +++++++++++---------
 tools/log4j-travis.properties                   | 11 ++++---
 4 files changed, 25 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d19e94a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
index 86401bc..b04be63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
@@ -48,7 +48,7 @@ public class StandaloneLeaderElectionTest extends TestLogger {
 			assertTrue(contender.isLeader());
 			assertEquals(null, contender.getLeaderSessionID());
 
-			testingListener.waitForLeader(1000l);
+			testingListener.waitForNewLeader(1000l);
 
 			assertEquals(TEST_URL, testingListener.getAddress());
 			assertEquals(null, testingListener.getLeaderSessionID());

http://git-wip-us.apache.org/repos/asf/flink/blob/0d19e94a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
index 7b3d06f..54ee822 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
@@ -47,33 +47,6 @@ public class TestingListener implements LeaderRetrievalListener {
 		return leaderSessionID;
 	}
 
-	public void clear() {
-		address = null;
-		leaderSessionID = null;
-	}
-
-	public void waitForLeader(long timeout) throws Exception {
-		long start = System.currentTimeMillis();
-		long curTimeout;
-
-		while (exception == null && address == null && (curTimeout = timeout -
System.currentTimeMillis() + start) > 0) {
-			synchronized (lock) {
-				try {
-					lock.wait(curTimeout);
-				} catch (InterruptedException e) {
-					// we got interrupted so check again for the condition
-				}
-			}
-		}
-
-		if (exception != null) {
-			throw exception;
-		} else if (address == null) {
-			throw new TimeoutException("Listener was not notified about a leader within " +
-					timeout + "ms");
-		}
-	}
-
 	public void waitForNewLeader(long timeout) throws Exception {
 		long start = System.currentTimeMillis();
 		long curTimeout;

http://git-wip-us.apache.org/repos/asf/flink/blob/0d19e94a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 7c7867a..34a582f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -105,7 +105,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			assertTrue(contender.isLeader());
 			assertEquals(leaderElectionService.getLeaderSessionID(), contender.getLeaderSessionID());
 
-			listener.waitForLeader(timeout.toMillis());
+			listener.waitForNewLeader(timeout.toMillis());
 
 			assertEquals(TEST_URL, listener.getAddress());
 			assertEquals(leaderElectionService.getLeaderSessionID(), listener.getLeaderSessionID());
@@ -226,7 +226,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			Pattern regex = Pattern.compile(pattern);
 
 			for (int i = 0; i < numTries; i++) {
-				listener.waitForLeader(timeout.toMillis());
+				listener.waitForNewLeader(timeout.toMillis());
 
 				String address = listener.getAddress();
 
@@ -238,9 +238,6 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 					assertEquals(listener.getLeaderSessionID(), contenders[index].getLeaderSessionID());
 
-					// clear the current leader of the listener
-					listener.clear();
-
 					// stop leader election service = revoke leadership
 					leaderElectionService[index].stop();
 					// create new leader election service which takes part in the leader election
@@ -285,25 +282,26 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
+		ZooKeeperLeaderRetrievalService leaderRetrievalService2 = null;
 		TestingListener listener = new TestingListener();
+		TestingListener listener2 = new TestingListener();
 		TestingContender contender;
 
 		try {
 			leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
 			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+			leaderRetrievalService2 = ZooKeeperUtils.createLeaderRetrievalService(configuration);
 
 			contender = new TestingContender(TEST_URL, leaderElectionService);
 
 			leaderElectionService.start(contender);
 			leaderRetrievalService.start(listener);
 
-			listener.waitForLeader(timeout.toMillis());
+			listener.waitForNewLeader(timeout.toMillis());
 
 			assertEquals(listener.getLeaderSessionID(), contender.getLeaderSessionID());
 			assertEquals(TEST_URL, listener.getAddress());
 
-			listener.clear();
-
 			CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
 
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -329,15 +327,16 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 				}
 			}
 
-			listener.waitForLeader(timeout.toMillis());
+			leaderRetrievalService2.start(listener2);
+
+			listener2.waitForNewLeader(timeout.toMillis());
 
-			if (FAULTY_CONTENDER_URL.equals(listener.getAddress())) {
-				listener.clear();
-				listener.waitForLeader(timeout.toMillis());
+			if (FAULTY_CONTENDER_URL.equals(listener2.getAddress())) {
+				listener2.waitForNewLeader(timeout.toMillis());
 			}
 
-			assertEquals(listener.getLeaderSessionID(), contender.getLeaderSessionID());
-			assertEquals(listener.getAddress(), contender.getAddress());
+			assertEquals(listener2.getLeaderSessionID(), contender.getLeaderSessionID());
+			assertEquals(listener2.getAddress(), contender.getAddress());
 
 		} finally {
 			if (leaderElectionService != null) {
@@ -347,6 +346,10 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			if (leaderRetrievalService != null) {
 				leaderRetrievalService.stop();
 			}
+
+			if (leaderRetrievalService2 != null) {
+				leaderRetrievalService2.stop();
+			}
 		}
 	}
 
@@ -474,7 +477,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			leaderRetrievalService.start(listener);
 
 			try {
-				listener.waitForLeader(1000);
+				listener.waitForNewLeader(1000);
 
 				fail("TimeoutException was expected because there is no leader registered and " +
 						"thus there shouldn't be any leader information in ZooKeeper.");

http://git-wip-us.apache.org/repos/asf/flink/blob/0d19e94a/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 1cdd152..53379b4 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -35,11 +35,12 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
-log4j.logger.org.apache.zookeeper=ERROR, file
-log4j.logger.org.apache.zookeeper.server.quorum.QuorumCnxManager=OFF, file
-log4j.logger.org.apache.flink.runtime.leaderelection=DEBUG,file
-log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG,file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.apache.zookeeper.server.quorum.QuorumCnxManager=OFF
+log4j.logger.org.apache.flink.runtime.leaderelection=DEBUG
+log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG
+
 # Log a bit when running the flink-yarn-tests to avoid running into the 5 minutes timeout
for
 # the tests
 log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console


Mime
View raw message