flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [05/51] [abbrv] flink git commit: [FLINK-2547] [web dashboard] Add handlers for cluster status and web dashboard configuration
Date Thu, 17 Sep 2015 18:19:37 GMT
[FLINK-2547] [web dashboard] Add handlers for cluster status and web dashboard configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01b0fc8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01b0fc8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01b0fc8f

Branch: refs/heads/master
Commit: 01b0fc8fc44d2454ef351928e73f5d49f9e41247
Parents: 562573d
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Aug 19 16:32:30 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Sep 17 14:21:49 2015 +0200

----------------------------------------------------------------------
 .../webmonitor/ExecutionGraphHolder.java        |  61 ++++------
 .../flink/runtime/webmonitor/JsonFactory.java   |  97 ++++++---------
 .../runtime/webmonitor/WebMonitorConfig.java    |  79 ++++++++++++
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 122 ++++++++-----------
 .../handlers/RequestConfigHandler.java          |  10 +-
 .../handlers/RequestJobIdsHandler.java          |  37 +++---
 .../handlers/RequestOverviewHandler.java        |  43 +++----
 .../legacy/JobManagerInfoHandler.java           |  50 +++-----
 .../RequestStatusWithJobIDsOverview.java        |  68 -----------
 .../webmonitor/StatusWithJobIDsOverview.java    |  82 -------------
 .../flink/runtime/jobmanager/JobManager.scala   |  21 +---
 11 files changed, 250 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 4efb7ad..74278a1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -23,8 +23,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -32,7 +30,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.WeakHashMap;
 
 /**
- * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
+ * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archiver.
  * <p>
  * The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
  * at some point once no one else is pointing to the ExecutionGraph.
@@ -41,62 +39,49 @@ import java.util.WeakHashMap;
  */
 public class ExecutionGraphHolder {
 
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
+	private final ActorGateway source;
 
-	/** Retrieves the current leading JobManager and its corresponding archive */
-	private final JobManagerArchiveRetriever retriever;
-	
 	private final FiniteDuration timeout;
-	
+
 	private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID,
ExecutionGraph>();
 
-	public ExecutionGraphHolder(JobManagerArchiveRetriever retriever) {
-		this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+
+	public ExecutionGraphHolder(ActorGateway source) {
+		this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 
-	public ExecutionGraphHolder(JobManagerArchiveRetriever retriever, FiniteDuration timeout)
{
-		if (retriever == null || timeout == null) {
+	public ExecutionGraphHolder(ActorGateway source, FiniteDuration timeout) {
+		if (source == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.retriever = retriever;
+		this.source = source;
 		this.timeout = timeout;
 	}
 
-	/**
-	 * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
-	 *
-	 * @param jid jobID of the execution graph to be retrieved
-	 * @return the retrieved execution graph or null if it is not retrievable
-	 */
+
 	public ExecutionGraph getExecutionGraph(JobID jid) {
 		ExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
 			return cached;
 		}
-		
-		try {
-			ActorGateway jobManager = retriever.getJobManagerGateway();
 
-			if (jobManager != null) {
-
-				Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid),
timeout);
-				Object result = Await.result(future, timeout);
-				if (result instanceof JobManagerMessages.JobNotFound) {
-					return null;
-				} else if (result instanceof JobManagerMessages.JobFound) {
-					ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
-					cache.put(jid, eg);
-					return eg;
-				} else {
-					throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
-				}
-			} else {
-				LOG.warn("No connection to the leading JobManager.");
+		try {
+			Future<Object> future = source.ask(new JobManagerMessages.RequestJob(jid), timeout);
+			Object result = Await.result(future, timeout);
+			if (result instanceof JobManagerMessages.JobNotFound) {
 				return null;
 			}
+			else if (result instanceof JobManagerMessages.JobFound) {
+				ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
+				cache.put(jid, eg);
+				return eg;
+			}
+			else {
+				throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
+			}
 		}
 		catch (Exception e) {
 			throw new RuntimeException("Error requesting execution graph", e);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
index 67c0b79..66449e0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
@@ -18,95 +18,70 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
+
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 
-
 /**
  * This class implements the utility methods that convert the responses into JSON strings.
  */
 public class JsonFactory {
 
-	public static String generateConfigJSON(long refreshInterval) {
+	private static final com.fasterxml.jackson.core.JsonFactory jacksonFactory =
+			new com.fasterxml.jackson.core.JsonFactory();
+
+	public static String generateConfigJSON(long refreshInterval, long timeZoneOffset, String
timeZoneName) {
 		try {
-			JSONObject response = new JSONObject();
-			response.put("refresh-interval", refreshInterval);
-			return response.toString(2);
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = jacksonFactory.createJsonGenerator(writer);
+			
+			gen.writeStartObject();
+			gen.writeNumberField("refresh-interval", refreshInterval);
+			gen.writeNumberField("timezone-offset", timeZoneOffset);
+			gen.writeStringField("timezone-name", timeZoneName);
+			gen.writeEndObject();
+			
+			gen.close();
+			return writer.toString();
 		}
-		catch (JSONException e) {
+		catch (Exception e) {
 			// this should not happen
-			throw new RuntimeException(e);
+			throw new RuntimeException(e.getMessage(), e);
 		}
 	}
 	
 	public static String generateOverviewJSON(StatusOverview overview) {
 		try {
-			JSONObject response = new JSONObject();
-			response.put("taskmanagers", overview.getNumTaskManagersConnected());
-			response.put("slots-total", overview.getNumSlotsTotal());
-			response.put("slots-available", overview.getNumSlotsAvailable());
-			response.put("jobs-running", overview.getNumJobsRunningOrPending());
-			response.put("jobs-finished", overview.getNumJobsFinished());
-			response.put("jobs-cancelled", overview.getNumJobsCancelled());
-			response.put("jobs-failed", overview.getNumJobsFailed());
-			return response.toString(2);
-		}
-		catch (JSONException e) {
-			// this should not happen
-			throw new RuntimeException(e);
-		}
-	}
-
-	public static String generateOverviewWithJobIDsJSON(StatusWithJobIDsOverview overview) {
-		try {
-			List<JobID> runningIDs = overview.getJobsRunningOrPending();
-			List<String> runningStrings = new ArrayList<String>(runningIDs.size());
-			for (JobID jid : runningIDs) {
-				runningStrings.add(jid.toString());
-			}
-
-			List<JobID> finishedIDs = overview.getJobsFinished();
-			List<String> finishedStrings = new ArrayList<String>(finishedIDs.size());
-			for (JobID jid : finishedIDs) {
-				finishedStrings.add(jid.toString());
-			}
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = jacksonFactory.createJsonGenerator(writer);
 
-			List<JobID> canceledIDs = overview.getJobsCancelled();
-			List<String> canceledStrings = new ArrayList<String>(canceledIDs.size());
-			for (JobID jid : canceledIDs) {
-				canceledStrings.add(jid.toString());
-			}
+			gen.writeStartObject();
+			gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
+			gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
+			gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
+			gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
+			gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
+			gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
+			gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
+			gen.writeEndObject();
 
-			List<JobID> failedIDs = overview.getJobsFailed();
-			List<String> failedStrings = new ArrayList<String>(failedIDs.size());
-			for (JobID jid : failedIDs) {
-				failedStrings.add(jid.toString());
-			}
-			
-			JSONObject response = new JSONObject();
-			response.put("taskmanagers", overview.getNumTaskManagersConnected());
-			response.put("slots-total", overview.getNumSlotsTotal());
-			response.put("slots-available", overview.getNumSlotsAvailable());
-			
-			
-			response.put("jobs-running", runningStrings);
-			response.put("jobs-finished", finishedStrings);
-			response.put("jobs-cancelled", canceledStrings);
-			response.put("jobs-failed", failedStrings);
-			return response.toString(2);
+			gen.close();
+			return writer.toString();
 		}
-		catch (JSONException e) {
+		catch (Exception e) {
 			// this should not happen
-			throw new RuntimeException(e);
+			throw new RuntimeException(e.getMessage(), e);
 		}
 	}
+
 	
 	public static String generateJobsOverviewJSON(JobsWithIDsOverview overview) {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
new file mode 100644
index 0000000..c8e64c9
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+public class WebMonitorConfig {
+
+	// ------------------------------------------------------------------------
+	//  Config Keys
+	// ------------------------------------------------------------------------
+
+	/** The port for the runtime monitor web-frontend server. */
+	public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY;
+
+	/** The directory where the web server's static contents is stored */
+	public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY;
+
+	/** The initial refresh interval for the web dashboard */
+	public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval";
+	
+	
+	// ------------------------------------------------------------------------
+	//  Default values
+	// ------------------------------------------------------------------------
+
+	/** Default port for the web dashboard (= 8081) */
+	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+
+	/** Default refresh interval for the web dashboard (= 3000 msecs) */
+	public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000;
+	
+	
+	// ------------------------------------------------------------------------
+	//  Config
+	// ------------------------------------------------------------------------
+	
+	/** The configuration queried by this config object */
+	private final Configuration config;
+
+	
+	public WebMonitorConfig(Configuration config) {
+		if (config == null) {
+			throw new NullPointerException();
+		}
+		this.config = config;
+	}
+	
+	
+	public int getWebFrontendPort() {
+		return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+	}
+	
+	public String getWebRoot() {
+		return config.getString(JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
+	}
+	
+	public long getRefreshInterval() {
+		return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 4c38cae..5c7dc6b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import akka.actor.ActorSystem;
-import com.google.common.base.Preconditions;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -35,8 +33,7 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
@@ -60,7 +57,7 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * The root component of the web runtime monitor.
- * 
+ *
  * <p>The web runtime monitor is based in Netty HTTP. It uses the Netty-Router library
to route
  * HTTP requests of different paths to different response handlers. In addition, it serves
the static
  * files of the web frontend, such as HTML, CSS, or JS files.</p>
@@ -68,48 +65,37 @@ import java.util.concurrent.TimeUnit;
 public class WebRuntimeMonitor implements WebMonitor {
 
 	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-	
+
 	public static final long DEFAULT_REFRESH_INTERVAL = 5000;
 
 	/** Logger for web frontend startup / shutdown messages */
 	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
-	
+
 	/** Teh default path under which the static contents is stored */
 	private static final String STATIC_CONTENTS_PATH = "resources/web-runtime-monitor";
-	
+
 	// ------------------------------------------------------------------------
-	
-	private final Object startupShutdownLock = new Object();
 
-	private final LeaderRetrievalService leaderRetrievalService;
+	private final Object startupShutdownLock = new Object();
 
-	/** LeaderRetrievalListener which stores the currently leading JobManager and its archive
*/
-	private final JobManagerArchiveRetriever retriever;
+	private final Router router;
 
 	private final int configuredPort;
 
-	private final Router router;
-
 	private ServerBootstrap bootstrap;
-	
+
 	private Channel serverChannel;
 
-	// ------------------------------------------------------------------------
 
-	public WebRuntimeMonitor(
-			Configuration config,
-			LeaderRetrievalService leaderRetrievalService,
-			ActorSystem actorSystem)
-		throws IOException {
-		Preconditions.checkNotNull(config);
-		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
+	public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive)
throws IOException {
+
+		final WebMonitorConfig cfg = new WebMonitorConfig(config);
 
 		// figure out where our static contents is
-		final String configuredWebRoot = config.getString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
null);
 		final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
+		final String configuredWebRoot = cfg.getWebRoot();
 
 		final File webRootDir;
-		
 		if (configuredWebRoot != null) {
 			webRootDir = new File(configuredWebRoot);
 		}
@@ -117,52 +103,48 @@ public class WebRuntimeMonitor implements WebMonitor {
 			webRootDir = new File(flinkRoot, STATIC_CONTENTS_PATH);
 		}
 		else {
-			throw new IllegalConfigurationException("The given configuration provides neither the
web-document root (" 
-					+ ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY + "), not the Flink installation root
("
+			throw new IllegalConfigurationException("The given configuration provides neither the
web-document root ("
+					+ WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY + "), not the Flink installation root
("
 					+ ConfigConstants.FLINK_BASE_DIR_PATH_KEY + ").");
 		}
-		
+
 		// validate that the doc root is a valid directory
 		if (!(webRootDir.exists() && webRootDir.isDirectory() && webRootDir.canRead()))
{
-			throw new IllegalConfigurationException("The path to the static contents (" + 
+			throw new IllegalConfigurationException("The path to the static contents (" +
 					webRootDir.getAbsolutePath() + ") is not a readable directory.");
 		}
-		
+
 		// port configuration
-		this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+		this.configuredPort = cfg.getWebFrontendPort();
 		if (this.configuredPort < 0) {
 			throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
 		}
 
-		FiniteDuration timeout = AkkaUtils.getTimeout(config);
-		FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
-
-		retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);
-
-		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);
+		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(jobManager);
 
 		router = new Router()
-			// config how to interact with this web server
-			.GET("/config", handler(new RequestConfigHandler(DEFAULT_REFRESH_INTERVAL)))
+				// config how to interact with this web server
+				.GET("/config", handler(new RequestConfigHandler(cfg.getRefreshInterval())))
 
-			// the overview - how many task managers, slots, free slots, ...
-			.GET("/overview", handler(new RequestOverviewHandler(retriever)))
+						// the overview - how many task managers, slots, free slots, ...
+				.GET("/overview", handler(new RequestOverviewHandler(jobManager)))
 
-			// currently running jobs
-			.GET("/jobs", handler(new RequestJobIdsHandler(retriever)))
-			.GET("/jobs/:jobid", handler(new JobSummaryHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
-			.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
-			.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
+						// currently running jobs
+				.GET("/jobs", handler(new RequestJobIdsHandler(jobManager)))
+				.GET("/jobs/:jobid", handler(new JobSummaryHandler(currentGraphs)))
+				.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
+				.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
+				.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
 
 //			.GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
 
-			// the handler for the legacy requests
-			.GET("/jobsInfo", new JobManagerInfoHandler(retriever, DEFAULT_REQUEST_TIMEOUT))
+						// the handler for the legacy requests
+				.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
+
+						// this handler serves all the static contents
+				.GET("/:*", new StaticFileServerHandler(webRootDir));
+
 
-			// this handler serves all the static contents
-			.GET("/:*", new StaticFileServerHandler(webRootDir));
 	}
 
 	@Override
@@ -172,20 +154,20 @@ public class WebRuntimeMonitor implements WebMonitor {
 				throw new IllegalStateException("The server has already been started");
 			}
 
-			final Handler handler = new Handler(router);
-
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>()
{
 
 				@Override
 				protected void initChannel(SocketChannel ch) {
+					Handler handler = new Handler(router);
+
 					ch.pipeline()
-						.addLast(new HttpServerCodec())
-						.addLast(new HttpObjectAggregator(65536))
-						.addLast(new ChunkedWriteHandler())
-						.addLast(handler.name(), handler);
+							.addLast(new HttpServerCodec())
+							.addLast(new HttpObjectAggregator(65536))
+							.addLast(new ChunkedWriteHandler())
+							.addLast(handler.name(), handler);
 				}
 			};
-			
+
 			NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
 			NioEventLoopGroup workerGroup = new NioEventLoopGroup();
 
@@ -194,25 +176,21 @@ public class WebRuntimeMonitor implements WebMonitor {
 					.group(bossGroup, workerGroup)
 					.channel(NioServerSocketChannel.class)
 					.childHandler(initializer);
-	
+
 			Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
 			this.serverChannel = ch;
 
 			InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
 			String address = bindAddress.getAddress().getHostAddress();
 			int port = bindAddress.getPort();
-			
-			LOG.info("Web frontend listening at " + address + ':' + port);
 
-			leaderRetrievalService.start(retriever);
+			LOG.info("Web frontend listening at " + address + ':' + port);
 		}
 	}
-	
+
 	@Override
 	public void stop() throws Exception {
 		synchronized (startupShutdownLock) {
-			leaderRetrievalService.stop();
-
 			if (this.serverChannel != null) {
 				this.serverChannel.close().awaitUninterruptibly();
 				this.serverChannel = null;
@@ -225,7 +203,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			}
 		}
 	}
-	
+
 	@Override
 	public int getServerPort() {
 		Channel server = this.serverChannel;
@@ -237,15 +215,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 				LOG.error("Cannot access local server port", e);
 			}
 		}
-			
+
 		return -1;
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	private static RuntimeMonitorHandler handler(RequestHandler handler) {
 		return new RuntimeMonitorHandler(handler);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java
index edf70c1..af9280e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestConfigHandler.java
@@ -21,17 +21,23 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
 
 import java.util.Map;
+import java.util.TimeZone;
 
 /**
  * Responder that returns the parameters that define how the asynchronous requests
- * against this web server should behave. It defines for example the refresh interval.
+ * against this web server should behave. It defines for example the refresh interval,
+ * and time zone of the server timestamps.
  */
 public class RequestConfigHandler implements RequestHandler, RequestHandler.JsonResponse
{
 	
 	private final String configString;
 	
 	public RequestConfigHandler(long refreshInterval) {
-		this.configString = JsonFactory.generateConfigJSON(refreshInterval);
+		TimeZone timeZome = TimeZone.getDefault();
+		String timeZoneName = timeZome.getDisplayName();
+		long timeZoneOffset= timeZome.getRawOffset();
+		
+		this.configString = JsonFactory.generateConfigJSON(refreshInterval, timeZoneOffset, timeZoneName);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
index 8a177f4..c3efce0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
-import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
 import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
 
@@ -37,39 +36,33 @@ import java.util.Map;
  * given the JobManager or Archive Actor Reference.
  */
 public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse
{
-	
-	private final JobManagerArchiveRetriever retriever;
-	
+
+	private final ActorGateway target;
+
 	private final FiniteDuration timeout;
-	
-	public RequestJobIdsHandler(JobManagerArchiveRetriever retriever) {
-		this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+
+	public RequestJobIdsHandler(ActorGateway target) {
+		this(target, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
-	
-	public RequestJobIdsHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout)
{
-		if (retriever == null || timeout == null) {
+
+	public RequestJobIdsHandler(ActorGateway target, FiniteDuration timeout) {
+		if (target == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.retriever = retriever;
+		this.target = target;
 		this.timeout = timeout;
 	}
-	
+
 	@Override
 	public String handleRequest(Map<String, String> params) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			ActorGateway jobManager = retriever.getJobManagerGateway();
-
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(),
timeout);
-				JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
-				return JsonFactory.generateJobsOverviewJSON(result);
-			} else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
+			Future<Object> future = target.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
+			JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
+			return JsonFactory.generateJobsOverviewJSON(result);
 		}
 		catch (Exception e) {
 			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(),
e);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
index ce30122..7268945 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview;
-import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
-import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
+import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
 import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
 
@@ -36,39 +35,33 @@ import java.util.Map;
  * TaskManagers are currently connected, and how many jobs are running.
  */
 public class RequestOverviewHandler implements  RequestHandler, RequestHandler.JsonResponse
{
-	
-	private final JobManagerArchiveRetriever retriever;
-	
+
+	private final ActorGateway jobManager;
+
 	private final FiniteDuration timeout;
-	
-	
-	public RequestOverviewHandler(JobManagerArchiveRetriever retriever) {
-		this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+
+
+	public RequestOverviewHandler(ActorGateway jobManager) {
+		this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
-	
-	public RequestOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout)
{
-		if (retriever == null || timeout == null) {
+
+	public RequestOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) {
+		if (jobManager == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.retriever = retriever;
+		this.jobManager = jobManager;
 		this.timeout = timeout;
 	}
-	
+
 	@Override
 	public String handleRequest(Map<String, String> params) throws Exception {
 		try {
-			ActorGateway jobManager = retriever.getJobManagerGateway();
-
-			if (jobManager != null) {
-				Future<Object> future = jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(),
timeout);
-				StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout);
-				return JsonFactory.generateOverviewWithJobIDsJSON(result);
-			} else {
-				throw new Exception("No connection to the leading job manager.");
-			}
+			Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
+			StatusOverview result = (StatusOverview) Await.result(future, timeout);
+			return JsonFactory.generateOverviewJSON(result);
 		}
 		catch (Exception e) {
 			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
index 3f1842b..c65bc0f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
@@ -48,7 +48,6 @@ import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneou
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.util.StringUtils;
@@ -76,15 +75,14 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed>
{
 	private static final Charset ENCODING = Charset.forName("UTF-8");
 
 	/** Underlying JobManager */
-	private final JobManagerArchiveRetriever retriever;
+	private final ActorGateway jobmanager;
+	private final ActorGateway archive;
 	private final FiniteDuration timeout;
 
-	private ActorGateway jobmanager;
-	private ActorGateway archive;
 
-
-	public JobManagerInfoHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout)
{
-		this.retriever = retriever;
+	public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway archive, FiniteDuration
timeout) {
+		this.jobmanager = jobmanager;
+		this.archive = archive;
 		this.timeout = timeout;
 	}
 
@@ -92,18 +90,6 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed>
{
 	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
 		DefaultFullHttpResponse response;
 		try {
-			jobmanager = retriever.getJobManagerGateway();
-
-			if (jobmanager == null) {
-				throw new Exception("No connection to leading JobManager.");
-			}
-
-			archive = retriever.getArchiveGateway();
-
-			if (archive == null) {
-				throw new Exception("No connection to leading JobManager.");
-			}
-
 			String result = handleRequest(routed);
 			byte[] bytes = result.getBytes(ENCODING);
 
@@ -121,15 +107,13 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed>
{
 
 		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
 		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-		
+
 		KeepAliveWrite.flush(ctx, routed.request(), response);
 	}
-	
-	
-	@SuppressWarnings("unchecked")
-	private String handleRequest(Routed routed) throws Exception {
 
 
+	@SuppressWarnings("unchecked")
+	private String handleRequest(Routed routed) throws Exception {
 		if ("archive".equals(routed.queryParam("get"))) {
 			Future<Object> response = archive.ask(ArchiveMessages.getRequestArchivedJobs(),
timeout);
 
@@ -302,7 +286,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed>
{
 			}
 		}
 		bld.append("]");
-		
+
 		return bld.toString();
 	}
 
@@ -555,7 +539,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed>
{
 		bld.append("}");
 		bld.append("}");
 		bld.append("]");
-		
+
 		return bld.toString();
 	}
 
@@ -584,7 +568,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed>
{
 
 			//Serialize job to json
 			final StringBuilder bld = new StringBuilder();
-			
+
 			bld.append("{");
 			bld.append("\"jobid\": \"").append(jobId).append("\",");
 			bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\",");
@@ -667,15 +651,15 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed>
{
 					bld.append("}");
 				}
 			}
-			
+
 			return bld.toString();
 		}
 	}
-	
+
 	private String writeJsonForArchivedJobGroupvertex(ExecutionGraph graph, JobVertexID vertexId)
{
 		ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
 		StringBuilder bld = new StringBuilder();
-		
+
 		bld.append("{\"groupvertex\": ").append(JsonFactory.toJson(jobVertex)).append(",");
 
 		bld.append("\"verticetimes\": {");
@@ -710,9 +694,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed>
{
 		return bld.toString();
 	}
 
-	
+
 	private String writeJsonForVersion() {
-		return "{\"version\": \"" + EnvironmentInformation.getVersion() + "\",\"revision\": \""
+ 
+		return "{\"version\": \"" + EnvironmentInformation.getVersion() + "\",\"revision\": \""
+
 				EnvironmentInformation.getRevisionInformation().commitId + "\"}";
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
deleted file mode 100644
index d26fe3d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-/**
- * This message requests an overview of the status, such as how many TaskManagers
- * are currently connected, how many slots are available, how many are free, ...
- * The response to this message is a {@link org.apache.flink.runtime.messages.webmonitor.StatusOverview}
message.
- */
-public class RequestStatusWithJobIDsOverview implements InfoMessage {
-
-	private static final long serialVersionUID = 3052933564788843275L;
-	
-	// ------------------------------------------------------------------------
-	
-	private static final RequestStatusWithJobIDsOverview INSTANCE = new RequestStatusWithJobIDsOverview();
-
-	public static RequestStatusWithJobIDsOverview getInstance() {
-		return INSTANCE;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return RequestStatusWithJobIDsOverview.class.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == RequestStatusWithJobIDsOverview.class;
-	}
-
-	@Override
-	public String toString() {
-		return RequestStatusWithJobIDsOverview.class.getSimpleName();
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * No external instantiation
-	 */
-	private RequestStatusWithJobIDsOverview() {}
-
-	/**
-	 * Preserve the singleton property by returning the singleton instance
-	 */
-	private Object readResolve() {
-		return INSTANCE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
deleted file mode 100644
index 72fb01b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-
-import java.util.List;
-
-/**
- * Response to the {@link org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview}
- * message, carrying a description of the Flink cluster status.
- */
-public class StatusWithJobIDsOverview extends JobsWithIDsOverview {
-
-	private static final long serialVersionUID = -729861859715105265L;
-	
-	private final int numTaskManagersConnected;
-	private final int numSlotsTotal;
-	private final int numSlotsAvailable;
-
-	public StatusWithJobIDsOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
-									List<JobID> jobsRunningOrPending, List<JobID> jobsFinished,
-									List<JobID> jobsCancelled, List<JobID> jobsFailed) {
-
-		super(jobsRunningOrPending, jobsFinished, jobsCancelled, jobsFailed);
-		
-		this.numTaskManagersConnected = numTaskManagersConnected;
-		this.numSlotsTotal = numSlotsTotal;
-		this.numSlotsAvailable = numSlotsAvailable;
-	}
-
-	public StatusWithJobIDsOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
-									JobsWithIDsOverview jobs1, JobsWithIDsOverview jobs2) {
-		super(jobs1, jobs2);
-		this.numTaskManagersConnected = numTaskManagersConnected;
-		this.numSlotsTotal = numSlotsTotal;
-		this.numSlotsAvailable = numSlotsAvailable;
-	}
-
-	public int getNumTaskManagersConnected() {
-		return numTaskManagersConnected;
-	}
-
-	public int getNumSlotsTotal() {
-		return numSlotsTotal;
-	}
-
-	public int getNumSlotsAvailable() {
-		return numSlotsAvailable;
-	}
-	
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return "StatusOverview {" +
-				"numTaskManagersConnected=" + numTaskManagersConnected +
-				", numSlotsTotal=" + numSlotsTotal +
-				", numSlotsAvailable=" + numSlotsAvailable +
-				", numJobsRunningOrPending=" + getJobsRunningOrPending() +
-				", numJobsFinished=" + getJobsFinished() +
-				", numJobsCancelled=" + getJobsCancelled() +
-				", numJobsFailed=" + getJobsFailed() +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/01b0fc8f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7046937..64ed129 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -871,6 +871,9 @@ class JobManager(
 
   /**
    * Dedicated handler for monitor info request messages.
+   * 
+   * Note that this handler does not fail. Errors while responding to info messages are logged,
+   * but will not cause the actor to crash.
    *
    * @param actorMessage The info request message.
    */
@@ -918,23 +921,7 @@ class JobManager(
                 ourJobs, archiveOverview)
           }(context.dispatcher)
 
-        case _ : RequestStatusWithJobIDsOverview =>
-
-          val ourJobs = createJobStatusWithIDsOverview()
-
-          val numTMs = instanceManager.getNumberOfRegisteredTaskManagers()
-          val numSlotsTotal = instanceManager.getTotalNumberOfSlots()
-          val numSlotsAvailable = instanceManager.getNumberOfAvailableSlots()
-
-          // add to that the jobs from the archive
-          val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
-          future.onSuccess {
-            case archiveOverview: JobsWithIDsOverview =>
-              theSender ! new StatusWithJobIDsOverview(numTMs, numSlotsTotal, numSlotsAvailable,
-                ourJobs, archiveOverview)
-          }(context.dispatcher)
-
-        case _ => throw new Exception("Unrecognized info message " + actorMessage)
+        case _ => log.error("Unrecognized info message " + actorMessage)
       }
     }
     catch {


Mime
View raw message