flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [hotfix] [zk] Add logging for connection state changes in ZooKeeperLeaderElection/RetrievalService
Date Thu, 21 Apr 2016 17:06:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4b06b01ea -> 357483416


[hotfix] [zk] Add logging for connection state changes in ZooKeeperLeaderElection/RetrievalService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35748341
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35748341
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35748341

Branch: refs/heads/master
Commit: 3574834166531f5daf8ea21a0bf63e2d9d0195a4
Parents: 4b06b01
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Apr 21 18:28:49 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Apr 21 18:28:49 2016 +0200

----------------------------------------------------------------------
 .../ZooKeeperLeaderElectionService.java         | 29 ++++++++++++++++++++
 .../ZooKeeperLeaderRetrievalService.java        | 28 +++++++++++++++++++
 2 files changed, 57 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35748341/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 5c10293..3abd71c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -25,6 +25,8 @@ import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -67,6 +69,13 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService,
Le
 
 	private final Object lock = new Object();
 
+	private final ConnectionStateListener listener = new ConnectionStateListener() {
+		@Override
+		public void stateChanged(CuratorFramework client, ConnectionState newState) {
+			handleStateChange(newState);
+		}
+	};
+
 	/**
 	 * Creates a ZooKeeperLeaderElectionService object.
 	 *
@@ -105,12 +114,16 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService,
Le
 
 		cache.getListenable().addListener(this);
 		cache.start();
+
+		client.getConnectionStateListenable().addListener(listener);
 	}
 
 	@Override
 	public void stop() throws Exception{
 		LOG.info("Stopping ZooKeeperLeaderElectionService.");
 
+		client.getConnectionStateListenable().removeListener(listener);
+
 		cache.close();
 		leaderLatch.close();
 		client.close();
@@ -316,4 +329,20 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService,
Le
 							"ZooKeeper.", e));
 		}
 	}
+
+	protected void handleStateChange(ConnectionState newState) {
+		switch (newState) {
+			case CONNECTED:
+				LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
+			case SUSPENDED:
+				LOG.warn("Connection to ZooKeeper suspended. The contender " + leaderContender.getAddress()
+					+ "no longer participates in the leader election.");
+			case RECONNECTED:
+				LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");
+			case LOST:
+				// Maybe we have to throw an exception here to terminate the JobManager
+				LOG.warn("Connection to ZooKeeper lost. The contender " + leaderContender.getAddress()
+					+ "no longer participates in the leader election.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35748341/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
index d17133a..6c2dbef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -23,6 +23,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +55,13 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 	private String lastLeaderAddress;
 	private UUID lastLeaderSessionID;
 
+	private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
+		@Override
+		public void stateChanged(CuratorFramework client, ConnectionState newState) {
+			handleStateChange(newState);
+		}
+	};
+
 	/**
 	 * Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information.
 	 *
@@ -76,12 +85,16 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 
 		cache.getListenable().addListener(this);
 		cache.start();
+
+		client.getConnectionStateListenable().addListener(connectionStateListener);
 	}
 
 	@Override
 	public void stop() throws Exception {
 		LOG.info("Stopping ZooKeeperLeaderRetrievalService.");
 
+		client.getConnectionStateListenable().removeListener(connectionStateListener);
+
 		cache.close();
 		client.close();
 	}
@@ -130,4 +143,19 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 			throw e;
 		}
 	}
+
+	protected void handleStateChange(ConnectionState newState) {
+		switch (newState) {
+			case CONNECTED:
+				LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
+			case SUSPENDED:
+				LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from "
+
+					"ZooKeeper.");
+			case RECONNECTED:
+				LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
+			case LOST:
+				LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from " +
+					"ZooKeeper.");
+		}
+	}
 }


Mime
View raw message