flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [13/26] flink git commit: [FLINK-2358] [dashboard] First part dashboard server backend
Date Tue, 21 Jul 2015 19:10:52 GMT
[FLINK-2358] [dashboard] First part dashboard server backend

 - Adds a separate Maven project for easier maintenance. Also allows users to refer to runtime without web libraries.
 - Simple HTTP server based on Netty HTTP (slim dependency, since we use netty anyways)
 - REST URL parsing via Netty Router
 - Abstract stubs for handlers that deal with errors and request/response
 - First set of URL request handlers that produce JSON responses

This closes #677
This closes #623
This closes #297


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44ee1c1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44ee1c1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44ee1c1b

Branch: refs/heads/master
Commit: 44ee1c1b64b18db199a7e77492dc890a1234fcb0
Parents: 75d1639
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Apr 6 18:27:26 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Jul 21 17:58:14 2015 +0200

----------------------------------------------------------------------
 .../flink/client/web/WebInterfaceServer.java    |  32 +-
 .../flink/configuration/ConfigConstants.java    |  20 +-
 flink-dist/pom.xml                              |   6 +
 flink-dist/src/main/resources/flink-conf.yaml   |   3 +
 .../plantranslate/JobGraphGenerator.java        |   4 +
 flink-runtime-web/pom.xml                       | 108 +++++
 .../webmonitor/ExecutionGraphHolder.java        |  91 ++++
 .../flink/runtime/webmonitor/JsonFactory.java   | 188 ++++++++
 .../runtime/webmonitor/NotFoundException.java   |  32 ++
 .../webmonitor/RuntimeMonitorHandler.java       |  91 ++++
 .../flink/runtime/webmonitor/TestRunner.java    | 197 +++++++++
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 177 ++++++++
 .../AbstractExecutionGraphRequestHandler.java   |  66 +++
 .../handlers/ExecutionPlanHandler.java          |  40 ++
 .../webmonitor/handlers/JobSummaryHandler.java  | 103 +++++
 .../handlers/JobVertexDetailsHandler.java       |  42 ++
 .../handlers/JobVerticesOverviewHandler.java    |  99 +++++
 .../runtime/webmonitor/handlers/Parameters.java |  40 ++
 .../handlers/RequestConfigHandler.java          |  41 ++
 .../webmonitor/handlers/RequestHandler.java     |  39 ++
 .../handlers/RequestJobIdsHandler.java          |  72 ++++
 .../handlers/RequestOverviewHandler.java        |  71 +++
 .../webmonitor/handlers/TextResponder.java      |  38 ++
 flink-runtime/pom.xml                           |   6 -
 .../runtime/executiongraph/ExecutionGraph.java  |  13 +
 .../executiongraph/ExecutionJobVertex.java      |  31 ++
 .../flink/runtime/instance/InstanceManager.java |  18 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  10 +
 .../runtime/jobmanager/web/WebInfoServer.java   | 107 ++---
 .../messages/webmonitor/InfoMessage.java        |  25 ++
 .../messages/webmonitor/JobsOverview.java       | 102 +++++
 .../webmonitor/JobsWithIDsOverview.java         |  90 ++++
 .../webmonitor/RequestJobsOverview.java         |  67 +++
 .../webmonitor/RequestJobsWithIDsOverview.java  |  67 +++
 .../webmonitor/RequestStatusOverview.java       |  68 +++
 .../RequestStatusWithJobIDsOverview.java        |  68 +++
 .../messages/webmonitor/StatusOverview.java     | 106 +++++
 .../webmonitor/StatusWithJobIDsOverview.java    |  82 ++++
 .../messages/webmonitor/package-info.java       |  25 ++
 .../flink/runtime/webmonitor/WebMonitor.java    |  48 +++
 .../flink/runtime/jobmanager/JobManager.scala   | 191 ++++++++-
 .../runtime/jobmanager/MemoryArchivist.scala    |  65 +++
 .../minicluster/LocalFlinkMiniCluster.scala     |  15 +-
 .../flink-shaded-hadoop1/pom.xml                |  88 ++++
 .../flink-shaded-include-yarn/pom.xml           | 428 +++++++++++++++++++
 .../apache/flink/test/util/TestBaseUtils.java   |   8 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   8 +-
 pom.xml                                         |   1 +
 48 files changed, 3201 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
index 6384c9a..e30d45b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
@@ -28,11 +28,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.eclipse.jetty.http.security.Constraint;
-import org.eclipse.jetty.security.ConstraintMapping;
-import org.eclipse.jetty.security.ConstraintSecurityHandler;
-import org.eclipse.jetty.security.HashLoginService;
-import org.eclipse.jetty.security.authentication.BasicAuthenticator;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.ContextHandler;
 import org.eclipse.jetty.server.handler.HandlerList;
@@ -182,32 +177,7 @@ public class WebInterfaceServer {
 				af = null;
 			}
 		}
-		if (af != null) {
-			HashLoginService loginService = new HashLoginService("Flink Query Engine Interface", authFile);
-			server.addBean(loginService);
-
-			Constraint constraint = new Constraint();
-			constraint.setName(Constraint.__BASIC_AUTH);
-			constraint.setAuthenticate(true);
-			constraint.setRoles(new String[] { "user" });
-
-			ConstraintMapping mapping = new ConstraintMapping();
-			mapping.setPathSpec("/*");
-			mapping.setConstraint(constraint);
-
-			ConstraintSecurityHandler sh = new ConstraintSecurityHandler();
-			sh.addConstraintMapping(mapping);
-			sh.setAuthenticator(new BasicAuthenticator());
-			sh.setLoginService(loginService);
-			sh.setStrict(true);
-
-			// set the handers: the server hands the request to the security handler,
-			// which hands the request to the other handlers when authenticated
-			sh.setHandler(handlers);
-			server.setHandler(sh);
-		} else {
-			server.setHandler(handlers);
-		}
+		server.setHandler(handlers);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index e3b8b53..9690f41 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -236,7 +236,7 @@ public final class ConfigConstants {
 	public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";
 
 	/**
-	 * The heartbeat intervall between the Application Master and the YARN Resource Manager.
+	 * The heartbeat interval between the Application Master and the YARN Resource Manager.
 	 *
 	 * The default value is 5 (seconds).
 	 */
@@ -300,9 +300,14 @@ public final class ConfigConstants {
 	public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
 
 	/**
-	 * The config parameter defining the path to the htaccess file protecting the web frontend.
+	 * The option that specifies whether to use the new web frontend
 	 */
-	public static final String JOB_MANAGER_WEB_ACCESS_FILE_KEY = "jobmanager.web.access";
+	public static final String JOB_MANAGER_NEW_WEB_FRONTEND_KEY = "jobmanager.new-web-frontend";
+
+	/**
+	 * The port for the runtime monitor web-frontend server.
+	 */
+	public static final String JOB_MANAGER_NEW_WEB_PORT_KEY = "jobmanager.new-web.port";
 	
 	/**
 	 * The config parameter defining the number of archived jobs for the jobmanager
@@ -310,6 +315,9 @@ public final class ConfigConstants {
 	public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
 	
 	public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";
+
+	/** The directory where the web server's static contents is stored */
+	public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = "jobmanager.web.docroot";
 	
 	
 	// ------------------------------ Web Client ------------------------------
@@ -606,6 +614,12 @@ public final class ConfigConstants {
 	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
 
 	/**
+	 * The config key for the port of the JobManager new web frontend.
+	 * Setting this value to {@code -1} disables the web frontend.
+	 */
+	public static final int DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT = 8082;
+	
+	/**
 	 * The default number of archived jobs for the jobmanager
 	 */
 	public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5;

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index e373b9a..2264119 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -61,6 +61,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime-web</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-optimizer</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 2a287dc..34d0790 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -68,6 +68,9 @@ jobmanager.web.port: 8081
 
 webclient.port: 8080
 
+# Temporary: Uncomment this to be able to use the new web frontend
+#jobmanager.new-web-frontend: true
+
 
 #==============================================================================
 # Streaming state checkpointing

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 281e425..6fd2796 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -48,6 +48,7 @@ import org.apache.flink.optimizer.plan.WorksetPlanNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
@@ -221,6 +222,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			throw new RuntimeException("Config object could not be written to Job Configuration: " + e);
 		}
 
+		String jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(program);
+		graph.setJsonPlan(jsonPlan);
+
 		// release all references again
 		this.vertices = null;
 		this.chainedTasks = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
new file mode 100644
index 0000000..ffe15af
--- /dev/null
+++ b/flink-runtime-web/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-runtime-web</artifactId>
+	<name>flink-runtime-web</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- ===================================================
+						Flink Dependencies
+			=================================================== -->
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- ===================================================
+						Dependencies for the Web Server
+			=================================================== -->
+
+		<dependency>
+			<groupId>tv.cntt</groupId>
+			<artifactId>netty-router</artifactId>
+			<version>1.10</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.javassist</groupId>
+			<artifactId>javassist</artifactId>
+			<version>3.18.2-GA</version>
+		</dependency>
+
+		<!-- ===================================================
+				Dependencies for Actor Calls
+			=================================================== -->
+		
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-actor_${scala.binary.version}</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+		</dependency>
+
+		<!-- ===================================================
+								Utilities
+			=================================================== -->
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+		
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
new file mode 100644
index 0000000..18a548c
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.WeakHashMap;
+
+/**
+ * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archiver.
+ * <p>
+ * The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
+ * at some point once no one else is pointing to the ExecutionGraph.
+ * Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should
+ * stay valid.
+ */
+public class ExecutionGraphHolder {
+	
+	private final ActorRef source;
+	
+	private final FiniteDuration timeout;
+	
+	private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
+	
+	
+	public ExecutionGraphHolder(ActorRef source) {
+		this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+	}
+
+	public ExecutionGraphHolder(ActorRef source, FiniteDuration timeout) {
+		if (source == null || timeout == null) {
+			throw new NullPointerException();
+		}
+		this.source = source;
+		this.timeout = timeout;
+	}
+	
+	
+	public ExecutionGraph getExecutionGraph(JobID jid) {
+		ExecutionGraph cached = cache.get(jid);
+		if (cached != null) {
+			return cached;
+		}
+		
+		try {
+			Timeout to = new Timeout(timeout);
+			Future<Object> future = Patterns.ask(source, new JobManagerMessages.RequestJob(jid), to);
+			Object result = Await.result(future, timeout);
+			if (result instanceof JobManagerMessages.JobNotFound) {
+				return null;
+			}
+			else if (result instanceof JobManagerMessages.JobFound) {
+				ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
+				cache.put(jid, eg);
+				return eg;
+			}
+			else {
+				throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
+			}
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Error requesting execution graph", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
new file mode 100644
index 0000000..67c0b79
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This class implements the utility methods that convert the responses into JSON strings.
+ */
+public class JsonFactory {
+
+	public static String generateConfigJSON(long refreshInterval) {
+		try {
+			JSONObject response = new JSONObject();
+			response.put("refresh-interval", refreshInterval);
+			return response.toString(2);
+		}
+		catch (JSONException e) {
+			// this should not happen
+			throw new RuntimeException(e);
+		}
+	}
+	
+	public static String generateOverviewJSON(StatusOverview overview) {
+		try {
+			JSONObject response = new JSONObject();
+			response.put("taskmanagers", overview.getNumTaskManagersConnected());
+			response.put("slots-total", overview.getNumSlotsTotal());
+			response.put("slots-available", overview.getNumSlotsAvailable());
+			response.put("jobs-running", overview.getNumJobsRunningOrPending());
+			response.put("jobs-finished", overview.getNumJobsFinished());
+			response.put("jobs-cancelled", overview.getNumJobsCancelled());
+			response.put("jobs-failed", overview.getNumJobsFailed());
+			return response.toString(2);
+		}
+		catch (JSONException e) {
+			// this should not happen
+			throw new RuntimeException(e);
+		}
+	}
+
+	public static String generateOverviewWithJobIDsJSON(StatusWithJobIDsOverview overview) {
+		try {
+			List<JobID> runningIDs = overview.getJobsRunningOrPending();
+			List<String> runningStrings = new ArrayList<String>(runningIDs.size());
+			for (JobID jid : runningIDs) {
+				runningStrings.add(jid.toString());
+			}
+
+			List<JobID> finishedIDs = overview.getJobsFinished();
+			List<String> finishedStrings = new ArrayList<String>(finishedIDs.size());
+			for (JobID jid : finishedIDs) {
+				finishedStrings.add(jid.toString());
+			}
+
+			List<JobID> canceledIDs = overview.getJobsCancelled();
+			List<String> canceledStrings = new ArrayList<String>(canceledIDs.size());
+			for (JobID jid : canceledIDs) {
+				canceledStrings.add(jid.toString());
+			}
+
+			List<JobID> failedIDs = overview.getJobsFailed();
+			List<String> failedStrings = new ArrayList<String>(failedIDs.size());
+			for (JobID jid : failedIDs) {
+				failedStrings.add(jid.toString());
+			}
+			
+			JSONObject response = new JSONObject();
+			response.put("taskmanagers", overview.getNumTaskManagersConnected());
+			response.put("slots-total", overview.getNumSlotsTotal());
+			response.put("slots-available", overview.getNumSlotsAvailable());
+			
+			
+			response.put("jobs-running", runningStrings);
+			response.put("jobs-finished", finishedStrings);
+			response.put("jobs-cancelled", canceledStrings);
+			response.put("jobs-failed", failedStrings);
+			return response.toString(2);
+		}
+		catch (JSONException e) {
+			// this should not happen
+			throw new RuntimeException(e);
+		}
+	}
+	
+	public static String generateJobsOverviewJSON(JobsWithIDsOverview overview) {
+		try {
+			List<JobID> runningIDs = overview.getJobsRunningOrPending();
+			List<String> runningStrings = new ArrayList<String>(runningIDs.size());
+			for (JobID jid : runningIDs) {
+				runningStrings.add(jid.toString());
+			}
+
+			List<JobID> finishedIDs = overview.getJobsFinished();
+			List<String> finishedStrings = new ArrayList<String>(finishedIDs.size());
+			for (JobID jid : finishedIDs) {
+				finishedStrings.add(jid.toString());
+			}
+
+			List<JobID> canceledIDs = overview.getJobsCancelled();
+			List<String> canceledStrings = new ArrayList<String>(canceledIDs.size());
+			for (JobID jid : canceledIDs) {
+				canceledStrings.add(jid.toString());
+			}
+
+			List<JobID> failedIDs = overview.getJobsFailed();
+			List<String> failedStrings = new ArrayList<String>(failedIDs.size());
+			for (JobID jid : failedIDs) {
+				failedStrings.add(jid.toString());
+			}
+
+			JSONObject response = new JSONObject();
+			response.put("jobs-running", runningStrings);
+			response.put("jobs-finished", finishedStrings);
+			response.put("jobs-cancelled", canceledStrings);
+			response.put("jobs-failed", failedStrings);
+			return response.toString(2);
+		}
+		catch (JSONException e) {
+			// this should not happen
+			throw new RuntimeException(e);
+		}
+	}
+	
+	public static String createJobSummaryJSON(JobID jid, String jobName, String state,
+												String start, String end, String duration,
+												int numOperators, int numOperatorsPending,
+												int numOperatorsRunning, int numOperatorsFinished,
+												int numOperatorsCanceling, int numOperatorsCanceled,
+												int numOperatorsFailed) {
+		try {
+			JSONObject json = new JSONObject();
+
+			json.put("jid", jid.toString());
+			json.put("name", jobName);
+			json.put("state", state);
+			json.put("start-time", start);
+			json.put("end-time", end);
+			json.put("duration", duration);
+			
+			JSONObject operators = new JSONObject();
+			operators.put("total", numOperators);
+			operators.put("pending", numOperatorsPending);
+			operators.put("running", numOperatorsRunning);
+			operators.put("finished", numOperatorsFinished);
+			operators.put("canceling", numOperatorsCanceling);
+			operators.put("canceled", numOperatorsCanceled);
+			operators.put("failed", numOperatorsFailed);
+			json.put("operators", operators);
+			
+			return json.toString(2);
+		}
+		catch (JSONException e) {
+			// this should not happen
+			throw new RuntimeException(e);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/** Don't instantiate */
+	private JsonFactory() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
new file mode 100644
index 0000000..71125c9
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+/**
+ * A special exception that indicates that an element was not found and that the
+ * request should be answered with a {@code 404} return code.
+ */
+public class NotFoundException extends Exception {
+
+	private static final long serialVersionUID = -4036006746423754639L;
+
+	public NotFoundException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
new file mode 100644
index 0000000..4574519
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.KeepAliveWrite;
+import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.nio.charset.Charset;
+
+/**
+ * The Netty channel handler that processes all HTTP requests.
+ * This handler takes the path parameters and delegates the work to a {@link RequestHandler}.
+ * This handler also deals with setting correct response MIME types and returning
+ * proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
+ */
+@ChannelHandler.Sharable
+public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {
+	
+	private static final Charset ENCODING = Charset.forName("UTF-8");
+	
+	private final RequestHandler handler;
+	
+	private final String contentType;
+	
+	public RuntimeMonitorHandler(RequestHandler handler) {
+		if (handler == null) {
+			throw new NullPointerException();
+		}
+		this.handler = handler;
+		this.contentType = (handler instanceof RequestHandler.JsonResponse) ? "application/json" : "text/plain";
+	}
+	
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+		DefaultFullHttpResponse response;
+		
+		try {
+			String result = handler.handleRequest(routed.pathParams());
+			byte[] bytes = result.getBytes(ENCODING);
+			
+			response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, contentType);
+		}
+		catch (NotFoundException e) {
+			// this should result in a 404 error code (not found)
+			ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
+					: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
+			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+		}
+		catch (Exception e) {
+			byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
+			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+					HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+		}
+
+		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+		KeepAliveWrite.flush(ctx, routed.request(), response);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
new file mode 100644
index 0000000..eecc81a
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.relational.util.WebLogData;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * Simple runner that brings up a local cluster with the web server and executes two
+ * jobs to expose their data in the archive
+ */
+@SuppressWarnings("serial")
+public class TestRunner {
+
+	public static void main(String[] args) throws Exception {
+
+		// start the cluster with the runtime monitor
+		Configuration configuration = new Configuration();
+		configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
+		configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
+		configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
+			"/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
+		
+		LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
+
+		final int port = cluster.getJobManagerRPCPort();
+		runWordCount(port);
+		runWebLogAnalysisExample(port);
+		runWordCount(port);
+
+		Object o = new Object();
+		synchronized (o) {
+			o.wait();
+		}
+		
+		cluster.shutdown();
+	}
+	
+	private static void runWordCount(int port) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
+		
+		DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
+
+		counts.print();
+	}
+	
+	private static void runWebLogAnalysisExample(int port) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
+
+		// get input data
+		DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
+		DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
+		DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
+
+		// Retain documents with keywords
+		DataSet<Tuple1<String>> filterDocs = documents
+				.filter(new FilterDocByKeyWords())
+				.project(0);
+
+		// Filter ranks by minimum rank
+		DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
+				.filter(new FilterByRank());
+
+		// Filter visits by visit date
+		DataSet<Tuple1<String>> filterVisits = visits
+				.filter(new FilterVisitsByDate())
+				.project(0);
+
+		// Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
+		DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
+				filterDocs.join(filterRanks)
+						.where(0).equalTo(1)
+						.projectSecond(0,1,2);
+
+		// Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
+		DataSet<Tuple3<Integer, String, Integer>> result =
+				joinDocsRanks.coGroup(filterVisits)
+						.where(1).equalTo(0)
+						.with(new AntiJoinVisits());
+
+		result.print();
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+	
+	public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
+
+		private static final String[] KEYWORDS = { " editors ", " oscillations " };
+
+		@Override
+		public boolean filter(Tuple2<String, String> value) throws Exception {
+			// FILTER
+			// Only collect the document if all keywords are contained
+			String docText = value.f1;
+			for (String kw : KEYWORDS) {
+				if (!docText.contains(kw)) {
+					return false;
+				}
+			}
+			return true;
+		}
+	}
+
+	public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
+
+		private static final int RANKFILTER = 40;
+
+		@Override
+		public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
+			return (value.f0 > RANKFILTER);
+		}
+	}
+
+
+	public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
+
+		private static final int YEARFILTER = 2007;
+
+		@Override
+		public boolean filter(Tuple2<String, String> value) throws Exception {
+			// Parse date string with the format YYYY-MM-DD and extract the year
+			String dateString = value.f1;
+			int year = Integer.parseInt(dateString.substring(0,4));
+			return (year == YEARFILTER);
+		}
+	}
+	
+	
+	@FunctionAnnotation.ForwardedFieldsFirst("*")
+	public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
+			// Check if there is a entry in the visits relation
+			if (!visits.iterator().hasNext()) {
+				for (Tuple3<Integer, String, Integer> next : ranks) {
+					// Emit all rank pairs
+					out.collect(next);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
new file mode 100644
index 0000000..a5c1c48
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorRef;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.router.BadClientSilencer;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Router;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobSummaryHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobVerticesOverviewHandler;
+import org.apache.flink.runtime.webmonitor.handlers.RequestConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.RequestJobIdsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.RequestOverviewHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+public class WebRuntimeMonitor implements WebMonitor {
+
+	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+	
+	public static final long DEFAULT_REFRESH_INTERVAL = 2000;
+	
+	
+	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
+	
+	// ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	
+	private final Router router;
+
+	private final int configuredPort;
+
+	private ServerBootstrap bootstrap;
+	
+	private Channel serverChannel;
+
+	
+	public WebRuntimeMonitor(Configuration config, ActorRef jobManager, ActorRef archive) throws IOException {
+		
+		this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_NEW_WEB_PORT_KEY,
+												ConfigConstants.DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT);
+		if (this.configuredPort < 0) {
+			throw new IllegalArgumentException("Web frontend port is " + this.configuredPort);
+		}
+		
+		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(jobManager);
+		
+		router = new Router()
+			// config how to interact with this web server
+			.GET("/config", handler(new RequestConfigHandler(DEFAULT_REFRESH_INTERVAL)))
+			
+			// the overview - how many task managers, slots, free slots, ...
+			.GET("/overview", handler(new RequestOverviewHandler(jobManager)))
+
+			// currently running jobs
+			.GET("/jobs", handler(new RequestJobIdsHandler(jobManager)))
+			.GET("/jobs/:jobid", handler(new JobSummaryHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
+			.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)));
+//			.GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
+
+	}
+
+	@Override
+	public void start() throws Exception {
+
+		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
+
+			@Override
+			protected void initChannel(SocketChannel ch) {
+				Handler handler = new Handler(router);
+				
+				ch.pipeline()
+					.addLast(new HttpServerCodec())
+					.addLast(handler.name(), handler)
+					.addLast(new BadClientSilencer());
+			}
+		};
+		
+		NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
+		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+		this.bootstrap = new ServerBootstrap();
+		this.bootstrap.group(bossGroup, workerGroup)
+				.childOption(ChannelOption.TCP_NODELAY,  java.lang.Boolean.TRUE)
+				.childOption(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
+				.channel(NioServerSocketChannel.class)
+				.childHandler(initializer);
+
+		Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
+		
+		InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
+		String address = bindAddress.getAddress().getHostAddress();
+		int port = bindAddress.getPort();
+		
+		LOG.info("Web frontend listening at " + address + ':' + port);
+	}
+	
+	@Override
+	public void stop() throws Exception {
+		Channel server = this.serverChannel;
+		ServerBootstrap bootstrap = this.bootstrap;
+		
+		if (server != null) {
+			server.close().awaitUninterruptibly();
+			this.serverChannel = null;
+		}
+
+		if (bootstrap != null) {
+			if (bootstrap.group() != null) {
+				bootstrap.group().shutdownGracefully();
+			}
+			this.bootstrap = null;
+		}
+	}
+	
+	@Override
+	public int getServerPort() {
+		Channel server = this.serverChannel;
+		if (server != null) {
+			try {
+				return ((InetSocketAddress) server.localAddress()).getPort();
+			}
+			catch (Exception e) {
+				LOG.error("Cannot access local server port", e);
+			}
+		}
+			
+		return -1;
+	}
+	
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	private static RuntimeMonitorHandler handler(RequestHandler handler) {
+		return new RuntimeMonitorHandler(handler);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
new file mode 100644
index 0000000..44ca2d4
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
+
+import java.util.Map;
+
+/**
+ * Base class for request handlers whose response depends on an ExecutionGraph
+ * that can be retrieved via {@link Parameters#JOB_ID} parameter.
+ */
+public abstract class AbstractExecutionGraphRequestHandler implements RequestHandler {
+	
+	private final ExecutionGraphHolder executionGraphHolder;
+	
+	
+	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+		this.executionGraphHolder = executionGraphHolder;
+	}
+	
+	
+	@Override
+	public String handleRequest(Map<String, String> params) throws Exception {
+		String jidString = params.get(Parameters.JOB_ID);
+		if (jidString == null) {
+			throw new RuntimeException("JobId parameter missing");
+		}
+
+		JobID jid;
+		try {
+			jid = JobID.fromHexString(jidString);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage()); 
+		}
+		
+		ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid);
+		if (eg == null) {
+			throw new NotFoundException("Could not find execution graph for job " + jid);
+		}
+		
+		return handleRequest(eg, params);
+	}
+	
+	public abstract String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ExecutionPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ExecutionPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ExecutionPlanHandler.java
new file mode 100644
index 0000000..fd0a731
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ExecutionPlanHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns the JSON program plan of a job graph.
+ */
+public class ExecutionPlanHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+	
+	public ExecutionPlanHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+		return graph.getJsonPlan();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java
new file mode 100644
index 0000000..2dfe4de
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.JsonFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Request handler that returns a summary of the job status.
+ */
+public class JobSummaryHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+	private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+	
+	public JobSummaryHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+
+		JobID jid = graph.getJobID();
+		String name = graph.getJobName();
+		
+		long startTime = graph.getStatusTimestamp(JobStatus.CREATED);
+		long endTime = graph.getState().isTerminalState() ?
+				graph.getStatusTimestamp(graph.getState()) : -1;
+		
+		long duration = endTime == -1 ? System.currentTimeMillis() - startTime :
+				endTime - startTime;
+		
+		String startTimeString;
+		String endTimeTimeString;
+		String durationString = duration + " msecs";
+		
+		synchronized (dateFormatter) {
+			startTimeString = dateFormatter.format(new Date(startTime));
+			endTimeTimeString =  endTime == -1 ? "(pending)" : dateFormatter.format(new Date(endTime));
+		}
+		
+		String status = graph.getState().name();
+		
+		int pending = 0;
+		int running = 0;
+		int finished = 0;
+		int canceling = 0;
+		int canceled = 0;
+		int failed = 0;
+		
+		for (ExecutionJobVertex vertex : graph.getVerticesTopologically()) {
+			ExecutionState aggState = vertex.getAggregateState();
+			switch (aggState) {
+				case FINISHED:
+					finished++;
+					break;
+				case FAILED:
+					failed++;
+					break;
+				case CANCELED:
+					canceled++;
+					break;
+				case RUNNING:
+					running++;
+					break;
+				case CANCELING:
+					canceling++;
+					break;
+				default:
+					pending++;
+			}
+		}
+		
+		int total = pending + running + finished + canceling + canceled + failed;
+		
+		return JsonFactory.createJobSummaryJSON(jid, name, status, startTimeString, endTimeTimeString, durationString, 
+				total, pending, running, finished, canceling, canceled, failed);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
new file mode 100644
index 0000000..17c31a5
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns the JSON program plan of a job graph.
+ */
+public class JobVertexDetailsHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+	
+	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+		String vertexId = params.get(Parameters.JOB_VERTEX_ID);
+		
+		return "vertex: " + vertexId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVerticesOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVerticesOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVerticesOverviewHandler.java
new file mode 100644
index 0000000..6a0da66
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVerticesOverviewHandler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Request handler that returns the JSON program plan of a job graph.
+ */
+public class JobVerticesOverviewHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
+
+	
+	public JobVerticesOverviewHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+
+		JSONObject obj = new JSONObject();
+		
+		obj.put("jid", graph.getJobID().toString());
+		obj.put("name", graph.getJobName());
+
+		List<JSONObject> vertexJSONs = new ArrayList<JSONObject>();
+
+		for (ExecutionJobVertex vertex : graph.getVerticesTopologically()) {
+			JSONObject vertexJSON = new JSONObject();
+			vertexJSONs.add(vertexJSON);
+
+			// identifying parameters
+			JobVertex jobVertex = vertex.getJobVertex();
+			vertexJSON.put("id", jobVertex.getID().toString());
+			vertexJSON.put("name", jobVertex.getName());
+
+			// time
+			vertexJSON.put("start-time", System.currentTimeMillis() - 10000);
+			vertexJSON.put("end-time", System.currentTimeMillis() - 6453);
+
+			// read / write
+			vertexJSON.put("bytes-read-local", 14355376592L);
+			vertexJSON.put("bytes-read-remote", 607623465782L);
+			vertexJSON.put("bytes-written", 5372934L);
+			vertexJSON.put("records-read", 4659765L);
+			vertexJSON.put("records-written", 4659765L);
+
+			vertexJSON.put("parallelism", vertex.getParallelism());
+
+			JSONObject states = new JSONObject();
+			{
+				// count the occurrence of each state
+				int[] statesCount = new int[ExecutionState.values().length];
+				for (ExecutionVertex ev: vertex.getTaskVertices()) {
+					Execution ee = ev.getCurrentExecutionAttempt();
+					if (ee != null) {
+						statesCount[ee.getState().ordinal()]++;
+					}
+				}
+				
+				int i = 0;
+				for (ExecutionState state : ExecutionState.values()) {
+					states.put(state.name(), statesCount[i++]);
+				}
+			}
+			vertexJSON.put("states", states);
+		}
+
+		obj.put("vertices", vertexJSONs);
+		
+		return obj.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/Parameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/Parameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/Parameters.java
new file mode 100644
index 0000000..11dffb8
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/Parameters.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/**
+ * Parameter name constants.
+ */
+public class Parameters {
+
+	/**
+	 * The path parameter name for job ids. 
+	 */
+	public static final String JOB_ID = "jobid";
+
+	/**
+	 * The path parameter name for job vertex id ids. 
+	 */
+	public static final String JOB_VERTEX_ID = "jobvertex";
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/** Do not instantiate */
+	private Parameters() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java
new file mode 100644
index 0000000..edf70c1
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.webmonitor.JsonFactory;
+
+import java.util.Map;
+
+/**
+ * Responder that returns the parameters that define how the asynchronous requests
+ * against this web server should behave. It defines for example the refresh interval.
+ */
+public class RequestConfigHandler implements RequestHandler, RequestHandler.JsonResponse {
+	
+	private final String configString;
+	
+	public RequestConfigHandler(long refreshInterval) {
+		this.configString = JsonFactory.generateConfigJSON(refreshInterval);
+	}
+	
+	@Override
+	public String handleRequest(Map<String, String> params) {
+		return this.configString;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
new file mode 100644
index 0000000..4ffb9d9
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import java.util.Map;
+
+public interface RequestHandler {
+
+	/**
+	 * This interface marks handlers that return JSON data.
+	 */
+	public static interface JsonResponse {}
+
+	/**
+	 * This interface marks handlers that return plain text data.
+	 */
+	public static interface TextResponse {}
+	
+	
+	// --------------------------------------------------------------------------------------------
+
+	String handleRequest(Map<String, String> params) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
new file mode 100644
index 0000000..a09bc1a
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
+import org.apache.flink.runtime.webmonitor.JsonFactory;
+import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Map;
+
+/**
+ * Responder that returns with a list of all JobIDs of jobs found at the target actor.
+ * May serve the IDs of current jobs, or past jobs, depending on whether this handler is
+ * given the JobManager or Archive Actor Reference.
+ */
+public class RequestJobIdsHandler implements  RequestHandler, RequestHandler.JsonResponse {
+	
+	private final ActorRef target;
+	
+	private final FiniteDuration timeout;
+	
+	public RequestJobIdsHandler(ActorRef target) {
+		this(target, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+	}
+	
+	public RequestJobIdsHandler(ActorRef target, FiniteDuration timeout) {
+		if (target == null || timeout == null) {
+			throw new NullPointerException();
+		}
+		this.target = target;
+		this.timeout = timeout;
+	}
+	
+	@Override
+	public String handleRequest(Map<String, String> params) throws Exception {
+		// we need no parameters, get all requests
+		try {
+			Timeout to = new Timeout(timeout); 
+			Future<Object> future = Patterns.ask(target, RequestJobsWithIDsOverview.getInstance(), to);
+			JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
+			return JsonFactory.generateJobsOverviewJSON(result);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
new file mode 100644
index 0000000..e51a4d1
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
+import org.apache.flink.runtime.webmonitor.JsonFactory;
+import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Map;
+
+/**
+ * Responder that returns the status of the Flink cluster, such as how many
+ * TaskManagers are currently connected, and how many jobs are running.
+ */
+public class RequestOverviewHandler implements  RequestHandler, RequestHandler.JsonResponse {
+	
+	private final ActorRef jobManager;
+	
+	private final FiniteDuration timeout;
+	
+	
+	public RequestOverviewHandler(ActorRef jobManager) {
+		this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+	}
+	
+	public RequestOverviewHandler(ActorRef jobManager, FiniteDuration timeout) {
+		if (jobManager == null || timeout == null) {
+			throw new NullPointerException();
+		}
+		this.jobManager = jobManager;
+		this.timeout = timeout;
+	}
+	
+	@Override
+	public String handleRequest(Map<String, String> params) throws Exception {
+		try {
+			Timeout to = new Timeout(timeout); 
+			Future<Object> future = Patterns.ask(jobManager, RequestStatusWithJobIDsOverview.getInstance(), to);
+			StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout);
+			return JsonFactory.generateOverviewWithJobIDsJSON(result);
+		}
+		catch (Exception e) {
+			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TextResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TextResponder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TextResponder.java
new file mode 100644
index 0000000..5cc7273
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TextResponder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import java.util.Map;
+
+/**
+ * Simple placeholder responder with a JSON object holding one key/value pair 
+ */
+public class TextResponder implements RequestHandler, RequestHandler.TextResponse {
+	
+	private final String message;
+	
+	public TextResponder(String message) {
+		this.message = message == null ? "" : message;
+	}
+	
+	@Override
+	public String handleRequest(Map<String, String> params) throws Exception {
+		return message;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 62e85c1..065467f 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -75,12 +75,6 @@ under the License.
 
 		<dependency>
 			<groupId>org.eclipse.jetty</groupId>
-			<artifactId>jetty-security</artifactId>
-			<!-- version is derived from base module -->
-		</dependency>
-
-		<dependency>
-			<groupId>org.eclipse.jetty</groupId>
 			<artifactId>jetty-servlet</artifactId>
 			<!-- version is derived from base module -->
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 70dc89c..677c809 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -235,6 +235,8 @@ public class ExecutionGraph implements Serializable {
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private ExecutionConfig executionConfig;
 
+	private String jsonPlan;
+
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -420,6 +422,17 @@ public class ExecutionGraph implements Serializable {
 		return this.requiredJarFiles;
 	}
 
+	// --------------------------------------------------------------------------------------------
+
+
+	public void setJsonPlan(String jsonPlan) {
+		this.jsonPlan = jsonPlan;
+	}
+
+	public String getJsonPlan() {
+		return jsonPlan;
+	}
+
 	public Scheduler getScheduler() {
 		return scheduler;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index fcc0e9b..dea619a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -213,6 +214,36 @@ public class ExecutionJobVertex implements Serializable {
 		return numSubtasksInFinalState == parallelism;
 	}
 	
+	public ExecutionState getAggregateState() {
+		
+		int[] num = new int[ExecutionState.values().length];
+		
+		for (ExecutionVertex vertex : this.taskVertices) {
+			num[vertex.getExecutionState().ordinal()]++;
+		}
+
+		if (num[ExecutionState.FAILED.ordinal()] > 0) {
+			return ExecutionState.FAILED;
+		}
+		if (num[ExecutionState.CANCELING.ordinal()] > 0) {
+			return ExecutionState.CANCELING;
+		}
+		else if (num[ExecutionState.CANCELED.ordinal()] > 0) {
+			return ExecutionState.CANCELED;
+		}
+		else if (num[ExecutionState.RUNNING.ordinal()] > 0) {
+			return ExecutionState.RUNNING;
+		}
+		else if (num[ExecutionState.FINISHED.ordinal()] > 0) {
+			return num[ExecutionState.FINISHED.ordinal()] == parallelism ?
+					ExecutionState.FINISHED : ExecutionState.RUNNING;
+		}
+		else {
+			// all else collapses under created
+			return ExecutionState.CREATED;
+		}
+	}
+	
 	//---------------------------------------------------------------------------------------------
 	
 	public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 4f6c7a6..ef49804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.instance;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -70,8 +70,8 @@ public class InstanceManager {
 	 * Creates an new instance manager.
 	 */
 	public InstanceManager() {
-		this.registeredHostsById = new HashMap<InstanceID, Instance>();
-		this.registeredHostsByConnection = new HashMap<ActorRef, Instance>();
+		this.registeredHostsById = new LinkedHashMap<InstanceID, Instance>();
+		this.registeredHostsByConnection = new LinkedHashMap<ActorRef, Instance>();
 		this.deadHosts = new HashSet<ActorRef>();
 	}
 
@@ -203,6 +203,18 @@ public class InstanceManager {
 	public int getTotalNumberOfSlots() {
 		return this.totalNumberOfAliveTaskSlots;
 	}
+	
+	public int getNumberOfAvailableSlots() {
+		synchronized (this.lock) {
+			int numSlots = 0;
+			
+			for (Instance i : this.registeredHostsById.values()) {
+				numSlots += i.getNumberOfAvailableSlots();
+			}
+			
+			return numSlots;
+		}
+	}
 
 	public Collection<Instance> getAllRegisteredInstances() {
 		synchronized (this.lock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 09b415b..d29d685 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -90,6 +90,8 @@ public class JobGraph implements Serializable {
 	private JobSnapshottingSettings snapshotSettings;
 	
 	
+	private String jsonPlan;
+
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -223,6 +225,14 @@ public class JobGraph implements Serializable {
 		return scheduleMode;
 	}
 
+	public String getJsonPlan() {
+		return jsonPlan;
+	}
+
+	public void setJsonPlan(String jsonPlan) {
+		this.jsonPlan = jsonPlan;
+	}
+
 	/**
 	 * Adds a new task vertex to the job graph if it is not already included.
 	 * 


Mime
View raw message