flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/7] flink git commit: [FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore
Date Wed, 30 Nov 2016 15:22:23 GMT
[FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d949c96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d949c96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d949c96

Branch: refs/heads/release-1.1
Commit: 8d949c966b8916213e9b57996aaba0fe7c0e13fa
Parents: ee478fe
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue Nov 29 16:35:14 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/SubmittedJobGraph.java   |  2 +-
 .../ZooKeeperSubmittedJobGraphStore.java        | 87 +++++++++++++-------
 2 files changed, 57 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d949c96/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index faacc93..e868da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -72,6 +72,6 @@ public class SubmittedJobGraph implements Serializable {
 
 	@Override
 	public String toString() {
-		return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+		return String.format("SubmittedJobGraph(%s, %s)", jobGraph.getJobID(), jobInfo);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d949c96/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index a1dd14b..7324c07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 	 */
 	private final PathChildrenCache pathCache;
 
+	/** The full configured base path including the namespace. */
+	private final String zooKeeperFullBasePath;
+
 	/** The external listener to be notified on races. */
 	private SubmittedJobGraphListener jobGraphListener;
 
@@ -117,6 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 		// All operations will have the path as root
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
+		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
 		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage,
executor);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
@@ -156,6 +160,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
+			LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
+
 			List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
 
 			while (true) {
@@ -168,6 +174,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 				}
 			}
 
+			LOG.info("Found {} job graphs.", submitted.size());
+
 			if (submitted.size() != 0) {
 				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
 
@@ -195,6 +203,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
+		LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
@@ -221,6 +231,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 		checkNotNull(jobGraph, "Job graph");
 		String path = getPathForJob(jobGraph.getJobId());
 
+		LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), zooKeeperFullBasePath, path);
+
 		boolean success = false;
 
 		while (!success) {
@@ -235,8 +247,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 
 						addedJobGraphs.add(jobGraph.getJobId());
 
-						LOG.info("Added {} to ZooKeeper.", jobGraph);
-
 						success = true;
 					}
 					catch (KeeperException.NodeExistsException ignored) {
@@ -258,6 +268,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 				}
 			}
 		}
+
+		LOG.info("Added {} to ZooKeeper.", jobGraph);
 	}
 
 	@Override
@@ -265,14 +277,17 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
+		LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
 		synchronized (cacheLock) {
 			if (addedJobGraphs.contains(jobId)) {
 				jobGraphsInZooKeeper.removeAndDiscardState(path);
 
 				addedJobGraphs.remove(jobId);
-				LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 			}
 		}
+
+		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
 	/**
@@ -297,70 +312,80 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 			}
 
 			switch (event.getType()) {
-				case CHILD_ADDED:
+				case CHILD_ADDED: {
+					JobID jobId = fromEvent(event);
+
+					LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
+
 					synchronized (cacheLock) {
 						try {
-							JobID jobId = fromEvent(event);
 							if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
 								try {
 									// Whoa! This has been added by someone else. Or we were fast
 									// to remove it (false positive).
 									jobGraphListener.onAddedJobGraph(jobId);
-								}
-								catch (Throwable t) {
+								} catch (Throwable t) {
 									LOG.error("Error in callback", t);
 								}
 							}
-						}
-						catch (Exception e) {
+						} catch (Exception e) {
 							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
 						}
 					}
+				}
+				break;
 
-					break;
-
-				case CHILD_UPDATED:
+				case CHILD_UPDATED: {
 					// Nothing to do
-					break;
+				}
+				break;
+
+				case CHILD_REMOVED: {
+					JobID jobId = fromEvent(event);
+
+					LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
 
-				case CHILD_REMOVED:
 					synchronized (cacheLock) {
 						try {
-							JobID jobId = fromEvent(event);
 							if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
 								try {
 									// Oh oh. Someone else removed one of our job graphs. Mean!
 									jobGraphListener.onRemovedJobGraph(jobId);
-								}
-								catch (Throwable t) {
+								} catch (Throwable t) {
 									LOG.error("Error in callback", t);
 								}
 							}
 
 							break;
-						}
-						catch (Exception e) {
+						} catch (Exception e) {
 							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
 						}
 					}
-					break;
+				}
+				break;
 
-				case CONNECTION_SUSPENDED:
+				case CONNECTION_SUSPENDED: {
 					LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
-							"graphs are not monitored (temporarily).");
-					break;
-				case CONNECTION_LOST:
+						"graphs are not monitored (temporarily).");
+				}
+				break;
+
+				case CONNECTION_LOST: {
 					LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
-							"graphs are not monitored (permanently).");
-					break;
+						"graphs are not monitored (permanently).");
+				}
+				break;
 
-				case CONNECTION_RECONNECTED:
+				case CONNECTION_RECONNECTED: {
 					LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
-							"graphs are monitored again.");
-					break;
-				case INITIALIZED:
+						"graphs are monitored again.");
+				}
+				break;
+
+				case INITIALIZED: {
 					LOG.info("SubmittedJobGraphsPathCacheListener initialized");
-					break;
+				}
+				break;
 			}
 		}
 


Mime
View raw message