flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/4] flink git commit: [FLINK-2206] Fix incorrect counts of finished, canceled, and failed jobs in webinterface
Date Fri, 12 Jun 2015 13:33:06 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.9 ecfde6dd9 -> f5f0709c9


[FLINK-2206] Fix incorrect counts of finished, canceled, and failed jobs in webinterface

This closes #826


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e513be72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e513be72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e513be72

Branch: refs/heads/release-0.9
Commit: e513be72a486b4f2e13c617eb6d4d08c03503ae7
Parents: ecfde6d
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Jun 12 01:45:03 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Jun 12 14:26:47 2015 +0200

----------------------------------------------------------------------
 .../jobmanager/web/JobManagerInfoServlet.java   | 31 +++++++++++++++++
 .../js/jobmanagerFrontend.js                    | 36 +++++++++-----------
 .../runtime/jobmanager/MemoryArchivist.scala    | 17 +++++++++
 .../runtime/messages/ArchiveMessages.scala      | 11 +++++-
 4 files changed, 75 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 6d58306..3fc3c82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 import org.eclipse.jetty.io.EofException;
 
+import scala.Tuple3;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -117,6 +118,20 @@ public class JobManagerInfoServlet extends HttpServlet {
 					writeJsonForArchive(resp.getWriter(), archivedJobs);
 				}
 			}
+			else if("jobcounts".equals(req.getParameter("get"))) {
+				response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
+						new Timeout(timeout));
+
+				result = Await.result(response, timeout);
+
+				if(!(result instanceof Tuple3)) {
+					throw new RuntimeException("RequestJobCounts requires a response of type " +
+							"Tuple3. Instead the response is of type " + result.getClass() +
+							".");
+				} else {
+					writeJsonForJobCounts(resp.getWriter(), (Tuple3)result);
+				}
+			}
 			else if("job".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
 
@@ -341,6 +356,22 @@ public class JobManagerInfoServlet extends HttpServlet {
 	}
 
 	/**
+	 * Writes Json with the job counts
+	 *
+	 * @param wrt
+	 * @param counts
+	 */
+	private void writeJsonForJobCounts(PrintWriter wrt, Tuple3<Integer, Integer, Integer>
jobCounts) {
+
+		wrt.write("{");
+		wrt.write("\"finished\": " + jobCounts._1() + ",");
+		wrt.write("\"canceled\": " + jobCounts._2() + ",");
+		wrt.write("\"failed\": "   + jobCounts._3());
+		wrt.write("}");
+
+	}
+
+	/**
 	 * Writes infos about archived job in Json format, including groupvertices and groupverticetimes
 	 *
 	 * @param wrt

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
b/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
index 92f6979..63d287c 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
@@ -81,6 +81,22 @@ function poll(jobId) {
 })();
 
 /*
+ * Polls the job execution counts on page load and every 2 seconds
+ */
+(function pollJobCounts() {
+	$.ajax({ url : "jobsInfo?get=jobcounts", cache: false, type : "GET",
+	    success : function(json) {
+
+		$("#jobs-finished").html(json.finished);
+		$("#jobs-canceled").html(json.canceled);
+		$("#jobs-failed").html(json.failed);
+
+	    }, dataType : "json",
+	});
+	setTimeout(pollJobCounts, 2000);
+})();
+
+/*
  * Polls the number of taskmanagers on page load
  */
 (function pollTaskmanagers() {
@@ -418,20 +434,12 @@ function updateTable(json) {
 	}
 }
 
-var archive_finished = 0;
-var archive_failed = 0;
-var archive_canceled = 0;
-
 /*
  * Creates job history table
  */
 function fillTableArchive(table, json) {
 	$(table).html("");
-	
-	$("#jobs-finished").html(archive_finished);
-	$("#jobs-failed").html(archive_failed);
-	$("#jobs-canceled").html(archive_canceled);
-	
+
 	$.each(json, function(i, job) {
 		_fillTableArchive(table, job, false)
 	});
@@ -459,14 +467,4 @@ function _fillTableArchive(table, job, prepend) {
 						+ job.jobname + " ("
 						+ formattedTimeFromTimestamp(parseInt(job.time))
 						+ ")</a></li>");
-	if (job.status == "FINISHED")
-		archive_finished++;
-	if (job.status == "FAILED")
-		archive_failed++;
-	if (job.status == "CANCELED")
-		archive_canceled++;
-	
-	$("#jobs-finished").html(archive_finished);
-	$("#jobs-failed").html(archive_failed);
-	$("#jobs-canceled").html(archive_canceled);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 62ea435..54d2f2f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.Actor
 import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
 import org.apache.flink.runtime.messages.ArchiveMessages._
@@ -45,6 +46,8 @@ import scala.collection.mutable
  *  then a [[CurrentJobStatus]] message with the last state is returned to the sender, otherwise
  *  a [[JobNotFound]] message is returned
  *
+ *  - [[RequestJobCounts]] returns the number of finished, canceled, and failed jobs as a
Tuple3
+ *
  * @param max_entries Maximum number of stored Flink jobs
  */
 class MemoryArchivist(private val max_entries: Int)
@@ -57,12 +60,23 @@ class MemoryArchivist(private val max_entries: Int)
    */
   val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
 
+  /* Counters for finished, canceled, and failed jobs */
+  var finishedCnt: Int = 0
+  var canceledCnt: Int = 0
+  var failedCnt: Int = 0
+
   override def receiveWithLogMessages: Receive = {
     
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) => 
       // wrap graph inside a soft reference
       graphs.update(jobID, graph)
+      // update job counters
+      graph.getState match {
+        case JobStatus.FINISHED => finishedCnt += 1
+        case JobStatus.CANCELED => canceledCnt += 1
+        case JobStatus.FAILED => failedCnt += 1
+      }
       trimHistory()
 
     case RequestArchivedJob(jobID: JobID) =>
@@ -83,6 +97,9 @@ class MemoryArchivist(private val max_entries: Int)
         case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
         case None => sender ! JobNotFound(jobID)
       }
+
+    case RequestJobCounts =>
+      sender ! (finishedCnt, canceledCnt, failedCnt)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index e9e7dec..c4e3f3e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -34,6 +34,11 @@ object ArchiveMessages {
   case object RequestArchivedJobs
 
   /**
+   * Requests the number of finished, canceled, and failed jobs
+   */
+  case object RequestJobCounts
+
+  /**
    * Reqeuest a specific ExecutionGraph by JobID. The response is [[RequestArchivedJob]]
    * @param jobID
    */
@@ -56,7 +61,7 @@ object ArchiveMessages {
       jobs.asJavaCollection
     }
   }
-  
+
   // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
@@ -64,4 +69,8 @@ object ArchiveMessages {
   def getRequestArchivedJobs : AnyRef = {
     RequestArchivedJobs
   }
+
+  def getRequestJobCounts : AnyRef = {
+    RequestJobCounts
+  }
 }


Mime
View raw message