flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed
Date Wed, 05 Apr 2017 21:26:17 GMT
Repository: flink
Updated Branches:
  refs/heads/master dc13500dd -> dabb0bac0


[FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dabb0bac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dabb0bac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dabb0bac

Branch: refs/heads/master
Commit: dabb0bac0f724d50dcab5b3b767f38dc5feeb407
Parents: dc13500
Author: zentol <chesnay@apache.org>
Authored: Fri Mar 24 19:39:31 2017 +0100
Committer: zentol <chesnay@apache.org>
Committed: Wed Apr 5 23:18:08 2017 +0200

----------------------------------------------------------------------
 .../groups/TaskManagerJobMetricGroup.java       | 25 ++++++++++++--------
 .../apache/flink/runtime/taskmanager/Task.java  |  9 +++++++
 2 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dabb0bac/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
index 1ac8140..79a87d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -72,16 +72,21 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricG
 
 		synchronized (this) {
 			if (!isClosed()) {
-				TaskMetricGroup task = new TaskMetricGroup(
-					registry,
-					this,
-					jobVertexId,
-					executionAttemptID,
-					taskName,
-					subtaskIndex,
-					attemptNumber);
-				tasks.put(executionAttemptID, task);
-				return task;
+				TaskMetricGroup prior = tasks.get(executionAttemptID);
+				if (prior != null) {
+					return prior;
+				} else {
+					TaskMetricGroup task = new TaskMetricGroup(
+						registry,
+						this,
+						jobVertexId,
+						executionAttemptID,
+						taskName,
+						subtaskIndex,
+						attemptNumber);
+					tasks.put(executionAttemptID, task);
+					return task;
+				}
 			} else {
 				return null;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/dabb0bac/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b0f0eb8..ef934de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -526,16 +526,25 @@ public class Task implements Runnable, TaskActions {
 			else if (current == ExecutionState.FAILED) {
 				// we were immediately failed. tell the TaskManager that we reached our final state
 				notifyFinalState();
+				if (metrics != null) {
+					metrics.close();
+				}
 				return;
 			}
 			else if (current == ExecutionState.CANCELING) {
 				if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
 					// we were immediately canceled. tell the TaskManager that we reached our final state
 					notifyFinalState();
+					if (metrics != null) {
+						metrics.close();
+					}
 					return;
 				}
 			}
 			else {
+				if (metrics != null) {
+					metrics.close();
+				}
 				throw new IllegalStateException("Invalid state for beginning of operation of task " +
this + '.');
 			}
 		}


Mime
View raw message