flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [hotfix] Move logging of JobStatus changes into the ExecutionGraph
Date Wed, 09 Nov 2016 14:03:45 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 0962cb6f4 -> 2041ba02b


[hotfix] Move logging of JobStatus changes into the ExecutionGraph

Prior the JobManager was responsible for logging the JobStatus changes. This introduced
out of order logging since the JM was a mere job status listener which was notified by
an asynchronous message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2041ba02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2041ba02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2041ba02

Branch: refs/heads/release-1.1
Commit: 2041ba02bbb5d3feac69a9b58fa2aba7405d117e
Parents: 0962cb6
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Nov 9 12:19:10 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Nov 9 14:57:34 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/ExecutionGraph.java  | 14 ++++++--------
 .../apache/flink/runtime/jobmanager/JobManager.scala  |  4 ----
 2 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2041ba02/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d3b48df..c4657e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1081,9 +1081,7 @@ public class ExecutionGraph implements Serializable {
 
 	private boolean transitionState(JobStatus current, JobStatus newState, Throwable error)
{
 		if (STATE_UPDATER.compareAndSet(this, current, newState)) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} switched from {} to {}.", this.getJobName(), current, newState);
-			}
+			LOG.info("Job {} ({}) switched from state {} to {}.", jobName, jobID, current, newState,
error);
 
 			stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
 			notifyJobStatusChange(newState, error);
@@ -1162,20 +1160,20 @@ public class ExecutionGraph implements Serializable {
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
 			synchronized (progressLock) {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Try to restart the job or fail it if no longer possible.", failureCause);
+					LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", jobName,
jobID, failureCause);
 				} else {
-					LOG.info("Try to restart the job or fail it if no longer possible.");
+					LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", jobName, jobID);
 				}
 
 				boolean isRestartable = !(failureCause instanceof SuppressRestartsException) &&
restartStrategy.canRestart();
 
 				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
-					LOG.info("Restarting the job...");
+					LOG.info("Restarting the job {} ({}).", jobName, jobID);
 					restartStrategy.restart(this);
 
 					return true;
 				} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED,
failureCause)) {
-					LOG.info("Could not restart the job.", failureCause);
+					LOG.info("Could not restart the job {} ({}).", jobName, jobID, failureCause);
 					postRunCleanup();
 
 					return true;
@@ -1332,7 +1330,7 @@ public class ExecutionGraph implements Serializable {
 			if (execution != null) {
 				execution.setAccumulators(flinkAccumulators, userAccumulators);
 			} else {
-				LOG.warn("Received accumulator result for unknown execution {}.", execID);
+				LOG.debug("Received accumulator result for unknown execution {}.", execID);
 			}
 		} catch (Exception e) {
 			LOG.error("Cannot update accumulators for job {}.", jobID, e);

http://git-wip-us.apache.org/repos/asf/flink/blob/2041ba02/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e5b5267..106ffb6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -752,10 +752,6 @@ class JobManager(
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
 
-          log.info(
-            s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
-            error)
-
           if (newJobStatus.isGloballyTerminalState()) {
             jobInfo.end = timeStamp
 


Mime
View raw message