flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [4/5] flink git commit: [FLINK-4057] Checkpoint Metrics
Date Fri, 01 Jul 2016 13:09:31 GMT
[FLINK-4057] Checkpoint Metrics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e540daf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e540daf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e540daf

Branch: refs/heads/master
Commit: 9e540daf6e44c386ca82e6818f87d634be316e6c
Parents: 8829f97
Author: zentol <chesnay@apache.org>
Authored: Wed Jun 15 13:19:56 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri Jul 1 15:09:16 2016 +0200

----------------------------------------------------------------------
 .../stats/SimpleCheckpointStatsTracker.java     |  22 +++-
 .../flink/runtime/jobmanager/JobManager.scala   |  15 ++-
 .../stats/SimpleCheckpointStatsTrackerTest.java |  11 +-
 .../jobmanager/JobManagerMetricTest.java        | 117 +++++++++++++++++++
 4 files changed, 157 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 19a8fe4..9d47457 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.stats;
 
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -109,7 +111,8 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker
{
 
 	public SimpleCheckpointStatsTracker(
 			int historySize,
-			List<ExecutionJobVertex> tasksToWaitFor) {
+			List<ExecutionJobVertex> tasksToWaitFor,
+			MetricGroup metrics) {
 
 		checkArgument(historySize >= 0);
 		this.historySize = historySize;
@@ -124,6 +127,9 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker
{
 		} else {
 			taskParallelism = Collections.emptyMap();
 		}
+
+		metrics.gauge("lastCheckpointSize", new CheckpointSizeGauge());
+		metrics.gauge("lastCheckpointDuration", new CheckpointDurationGauge());
 	}
 
 	@Override
@@ -411,4 +417,18 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker
{
 			return averageStateSize;
 		}
 	}
+
+	private class CheckpointSizeGauge implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getStateSize();
+		}
+	}
+
+	private class CheckpointDurationGauge implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getDuration();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 852b238..348282c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -36,7 +36,7 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalCon
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => FlinkMetricRegistry}
-import org.apache.flink.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.metrics.groups.{JobManagerMetricGroup, UnregisteredMetricsGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
@@ -1224,11 +1224,21 @@ class JobManager(
             if (isStatsDisabled) {
               new DisabledCheckpointStatsTracker()
             } else {
+
+              val jobMetrics = jobManagerMetricGroup match {
+                case Some(group) =>
+                  group.addJob(jobGraph.getJobID, jobGraph.getName) match {
+                    case (jobGroup:Any) => jobGroup
+                    case null => new UnregisteredMetricsGroup()
+                  }
+                case None =>
+                  new UnregisteredMetricsGroup()
+              }
               val historySize: Int = flinkConfiguration.getInteger(
                 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
                 ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-              new SimpleCheckpointStatsTracker(historySize, ackVertices)
+              new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics)
             }
 
           val jobParallelism = jobGraph.getSerializedExecutionConfig
@@ -1655,6 +1665,7 @@ class JobManager(
       case t: Throwable =>
         log.error(s"Could not properly unregister job $jobID form the library cache.", t)
     }
+    jobManagerMetricGroup.map(_.removeJob(jobID))
 
     futureOption
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index e8ffb08..9265ab1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint.stats;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -54,7 +55,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	@Test
 	public void testNoCompletedCheckpointYet() throws Exception {
 		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(
-				0, Collections.<ExecutionJobVertex>emptyList());
+				0, Collections.<ExecutionJobVertex>emptyList(), new UnregisteredMetricsGroup());
 
 		assertFalse(tracker.getJobStats().isDefined());
 		assertFalse(tracker.getOperatorStats(new JobVertexID()).isDefined());
@@ -64,7 +65,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	public void testRandomStats() throws Exception {
 		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
 		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
+		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new
UnregisteredMetricsGroup());
 
 		for (int i = 0; i < checkpoints.length; i++) {
 			CompletedCheckpoint checkpoint = checkpoints[i];
@@ -80,7 +81,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	public void testIllegalOperatorId() throws Exception {
 		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
 		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
+		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new
UnregisteredMetricsGroup());
 
 		for (CompletedCheckpoint checkpoint : checkpoints) {
 			tracker.onCompletedCheckpoint(checkpoint);
@@ -95,7 +96,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	public void testCompletedCheckpointReordering() throws Exception {
 		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
 		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
+		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new
UnregisteredMetricsGroup());
 
 		// First the second checkpoint notifies
 		tracker.onCompletedCheckpoint(checkpoints[1]);
@@ -115,7 +116,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception {
 		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
 		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
+		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new
UnregisteredMetricsGroup());
 
 		tracker.onCompletedCheckpoint(checkpoints[0]);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
new file mode 100644
index 0000000..5759888
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+	/**
+	 * Tests that metrics registered on the JobManager are actually accessible.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testJobManagerMetricAccess() throws Exception {
+		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+		Configuration flinkConfiguration = new Configuration();
+
+		flinkConfiguration.setString(KEY_METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
+
+		TestingCluster flink = new TestingCluster(flinkConfiguration);
+
+		try {
+			flink.start();
+
+			JobVertex sourceJobVertex = new JobVertex("Source");
+			sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+
+			JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
+			jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+				Collections.<JobVertexID>emptyList(),
+				Collections.<JobVertexID>emptyList(),
+				Collections.<JobVertexID>emptyList(),
+				500, 500, 50, 5));
+
+			flink.waitForActorsToBeAlive();
+
+			flink.submitJobDetached(jobGraph);
+
+			Future<Object> jobRunning = flink.getLeaderGateway(deadline.timeLeft())
+				.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING),
deadline.timeLeft());
+			Await.ready(jobRunning, deadline.timeLeft());
+
+			MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+			ObjectName objectName1 = new ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize");
+			assertEquals(-1L, mBeanServer.getAttribute(objectName1, "Value"));
+
+			Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft())
+				.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
+
+			BlockingInvokable.unblock();
+
+			// wait til the job has finished
+			Await.ready(jobFinished, deadline.timeLeft());
+		} finally {
+			flink.stop();
+		}
+	}
+
+	public static class BlockingInvokable extends AbstractInvokable {
+		private static boolean blocking = true;
+		private static final Object lock = new Object();
+
+		@Override
+		public void invoke() throws Exception {
+			while (blocking) {
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+		}
+
+		public static void unblock() {
+			blocking = false;
+
+			synchronized (lock) {
+				lock.notifyAll();
+			}
+		}
+	}
+}


Mime
View raw message