flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-2409] [webserver] Replaces ActorRefs with ActorGateways in the web server to automatically decorate messages with a leader session ID.
Date Mon, 03 Aug 2015 12:59:02 GMT
Repository: flink
Updated Branches:
  refs/heads/master 416ff589e -> fab61a195


[FLINK-2409] [webserver] Replaces ActorRefs with ActorGateways in the web server to automatically decorate messages with a leader session ID.

Refactored MiniCluster to also store a reference to the web server to stop it. Adds support for the new web interface for yarn

Fix web server start condition

This closes #959.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab61a19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab61a19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab61a19

Branch: refs/heads/master
Commit: fab61a1954ff1554448e826e1d273689ed520fc3
Parents: 416ff58
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Jul 29 18:03:52 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Aug 3 13:47:31 2015 +0200

----------------------------------------------------------------------
 .../webmonitor/ExecutionGraphHolder.java        | 14 +--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  5 +-
 .../handlers/RequestJobIdsHandler.java          | 14 +--
 .../handlers/RequestOverviewHandler.java        | 14 +--
 .../legacy/JobManagerInfoHandler.java           | 55 ++++++-----
 .../jobmanager/web/JobManagerInfoServlet.java   | 52 +++++------
 .../jobmanager/web/SetupInfoServlet.java        | 18 ++--
 .../runtime/jobmanager/web/WebInfoServer.java   |  8 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 97 +++++++++----------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 50 +++++++++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 45 +++++----
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../runtime/testingUtils/TestingCluster.scala   | 21 +++--
 .../test/util/ForkableFlinkMiniCluster.scala    | 41 ++++----
 .../apache/flink/yarn/ApplicationMaster.scala   | 98 +++++++++++++-------
 15 files changed, 293 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 18a548c..a017f3a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
 import scala.concurrent.Await;
@@ -42,18 +39,18 @@ import java.util.WeakHashMap;
  */
 public class ExecutionGraphHolder {
 	
-	private final ActorRef source;
+	private final ActorGateway source;
 	
 	private final FiniteDuration timeout;
 	
 	private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
 	
 	
-	public ExecutionGraphHolder(ActorRef source) {
+	public ExecutionGraphHolder(ActorGateway source) {
 		this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 
-	public ExecutionGraphHolder(ActorRef source, FiniteDuration timeout) {
+	public ExecutionGraphHolder(ActorGateway source, FiniteDuration timeout) {
 		if (source == null || timeout == null) {
 			throw new NullPointerException();
 		}
@@ -69,8 +66,7 @@ public class ExecutionGraphHolder {
 		}
 		
 		try {
-			Timeout to = new Timeout(timeout);
-			Future<Object> future = Patterns.ask(source, new JobManagerMessages.RequestJob(jid), to);
+			Future<Object> future = source.ask(new JobManagerMessages.RequestJob(jid), timeout);
 			Object result = Await.result(future, timeout);
 			if (result instanceof JobManagerMessages.JobNotFound) {
 				return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index a2095d4..006d18d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import akka.actor.ActorRef;
-
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -35,6 +33,7 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
@@ -88,7 +87,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	private Channel serverChannel;
 
 	
-	public WebRuntimeMonitor(Configuration config, ActorRef jobManager, ActorRef archive) throws IOException {
+	public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive) throws IOException {
 		// figure out where our static contents is
 		final String configuredWebRoot = config.getString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
 		final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
index 1f28a01..aa1a39f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
@@ -40,15 +37,15 @@ import java.util.Map;
  */
 public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
 	
-	private final ActorRef target;
+	private final ActorGateway target;
 	
 	private final FiniteDuration timeout;
 	
-	public RequestJobIdsHandler(ActorRef target) {
+	public RequestJobIdsHandler(ActorGateway target) {
 		this(target, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 	
-	public RequestJobIdsHandler(ActorRef target, FiniteDuration timeout) {
+	public RequestJobIdsHandler(ActorGateway target, FiniteDuration timeout) {
 		if (target == null || timeout == null) {
 			throw new NullPointerException();
 		}
@@ -60,8 +57,7 @@ public class RequestJobIdsHandler implements RequestHandler, RequestHandler.Json
 	public String handleRequest(Map<String, String> params) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			Timeout to = new Timeout(timeout); 
-			Future<Object> future = Patterns.ask(target, RequestJobsWithIDsOverview.getInstance(), to);
+			Future<Object> future = target.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
 			JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
 			return JsonFactory.generateJobsOverviewJSON(result);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
index e51a4d1..c2c00c7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
@@ -39,16 +36,16 @@ import java.util.Map;
  */
 public class RequestOverviewHandler implements  RequestHandler, RequestHandler.JsonResponse {
 	
-	private final ActorRef jobManager;
+	private final ActorGateway jobManager;
 	
 	private final FiniteDuration timeout;
 	
 	
-	public RequestOverviewHandler(ActorRef jobManager) {
+	public RequestOverviewHandler(ActorGateway jobManager) {
 		this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 	
-	public RequestOverviewHandler(ActorRef jobManager, FiniteDuration timeout) {
+	public RequestOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) {
 		if (jobManager == null || timeout == null) {
 			throw new NullPointerException();
 		}
@@ -59,8 +56,7 @@ public class RequestOverviewHandler implements  RequestHandler, RequestHandler.J
 	@Override
 	public String handleRequest(Map<String, String> params) throws Exception {
 		try {
-			Timeout to = new Timeout(timeout); 
-			Future<Object> future = Patterns.ask(jobManager, RequestStatusWithJobIDsOverview.getInstance(), to);
+			Future<Object> future = jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(), timeout);
 			StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout);
 			return JsonFactory.generateOverviewWithJobIDsJSON(result);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
index 0a1e08c..9b52736 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.legacy;
 
-import akka.actor.ActorRef;
-
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -41,6 +37,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -78,12 +75,12 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 	private static final Charset ENCODING = Charset.forName("UTF-8");
 
 	/** Underlying JobManager */
-	private final ActorRef jobmanager;
-	private final ActorRef archive;
+	private final ActorGateway jobmanager;
+	private final ActorGateway archive;
 	private final FiniteDuration timeout;
 
 
-	public JobManagerInfoHandler(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
+	public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
 		this.jobmanager = jobmanager;
 		this.archive = archive;
 		this.timeout = timeout;
@@ -118,8 +115,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 	@SuppressWarnings("unchecked")
 	private String handleRequest(Routed routed) throws Exception {
 		if ("archive".equals(routed.queryParam("get"))) {
-			Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(),
-					new Timeout(timeout));
+			Future<Object> response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
 
 			Object result = Await.result(response, timeout);
 
@@ -135,8 +131,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 			}
 		}
 		else if ("jobcounts".equals(routed.queryParam("get"))) {
-			Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
-					new Timeout(timeout));
+			Future<Object> response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
 
 			Object result = Await.result(response, timeout);
 
@@ -152,8 +147,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 		else if ("job".equals(routed.queryParam("get"))) {
 			String jobId = routed.queryParam("job");
 
-			Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
-					new Timeout(timeout));
+			Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+					timeout);
 
 			Object result = Await.result(response, timeout);
 
@@ -182,8 +177,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 				throw new Exception("Found null groupVertexId");
 			}
 
-			Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
-					new Timeout(timeout));
+			Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+					timeout);
 
 			Object result = Await.result(response, timeout);
 
@@ -205,9 +200,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 			}
 		}
 		else if ("taskmanagers".equals(routed.queryParam("get"))) {
-			Future<Object> response = Patterns.ask(jobmanager,
+			Future<Object> response = jobmanager.ask(
 					JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-					new Timeout(timeout));
+					timeout);
 
 			Object result = Await.result(response, timeout);
 
@@ -219,9 +214,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 			else {
 				final int numberOfTaskManagers = (Integer)result;
 
-				final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager,
+				final Future<Object> responseRegisteredSlots = jobmanager.ask(
 						JobManagerMessages.getRequestTotalNumberOfSlots(),
-						new Timeout(timeout));
+						timeout);
 
 				final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
 						timeout);
@@ -242,8 +237,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 		else if ("cancel".equals(routed.queryParam("get"))) {
 			String jobId = routed.queryParam("job");
 
-			Future<Object> response = Patterns.ask(jobmanager, new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
-					new Timeout(timeout));
+			Future<Object> response = jobmanager.ask(new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
+					timeout);
 
 			Await.ready(response, timeout);
 			return "{}";
@@ -256,8 +251,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 			return writeJsonForVersion();
 		}
 		else{
-			Future<Object> response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(),
-					new Timeout(timeout));
+			Future<Object> response = jobmanager.ask(JobManagerMessages.getRequestRunningJobs(),
+					timeout);
 
 			Object result = Await.result(response, timeout);
 
@@ -454,8 +449,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 		}
 
 		// write accumulators
-		final Future<Object> response = Patterns.ask(jobmanager,
-				new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));
+		final Future<Object> response = jobmanager.ask(
+				new RequestAccumulatorResultsStringified(graph.getJobID()),
+				timeout);
 
 		Object result;
 		try {
@@ -549,9 +545,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 
 
 	private String writeJsonUpdatesForJob(JobID jobId) {
-		final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager,
+		final Future<Object> responseArchivedJobs = jobmanager.ask(
 				JobManagerMessages.getRequestRunningJobs(),
-				new Timeout(timeout));
+				timeout);
 
 		Object resultArchivedJobs;
 		try{
@@ -591,8 +587,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 			}
 			bld.append("],");
 
-			final Future<Object> responseJob = Patterns.ask(jobmanager, new JobManagerMessages.RequestJob(jobId),
-					new Timeout(timeout));
+			final Future<Object> responseJob = jobmanager.ask(
+					new JobManagerMessages.RequestJob(jobId),
+					timeout);
 
 			Object resultJob;
 			try{

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 82ab63e..ce57714 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -32,12 +32,9 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import akka.actor.ActorRef;
-
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
 import org.apache.flink.runtime.messages.ArchiveMessages;
@@ -78,12 +75,12 @@ public class JobManagerInfoServlet extends HttpServlet {
 	private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class);
 
 	/** Underlying JobManager */
-	private final ActorRef jobmanager;
-	private final ActorRef archive;
+	private final ActorGateway jobmanager;
+	private final ActorGateway archive;
 	private final FiniteDuration timeout;
 
 
-	public JobManagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
+	public JobManagerInfoServlet(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
 		this.jobmanager = jobmanager;
 		this.archive = archive;
 		this.timeout = timeout;
@@ -102,8 +99,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 
 		try {
 			if("archive".equals(req.getParameter("get"))) {
-				response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(),
-						new Timeout(timeout));
+				response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
 
 				result = Await.result(response, timeout);
 
@@ -119,8 +115,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 				}
 			}
 			else if("jobcounts".equals(req.getParameter("get"))) {
-				response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
-						new Timeout(timeout));
+				response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
 
 				result = Await.result(response, timeout);
 
@@ -135,8 +130,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 			else if("job".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
 
-				response = Patterns.ask(archive, new RequestJob(JobID.fromHexString(jobId)),
-						new Timeout(timeout));
+				response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout);
 
 				result = Await.result(response, timeout);
 
@@ -163,8 +157,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 					return;
 				}
 
-				response = Patterns.ask(archive, new RequestJob(JobID.fromHexString(jobId)),
-						new Timeout(timeout));
+				response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout);
 
 				result = Await.result(response, timeout);
 
@@ -186,9 +179,9 @@ public class JobManagerInfoServlet extends HttpServlet {
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
 
-				response = Patterns.ask(jobmanager,
+				response = jobmanager.ask(
 						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
+						timeout);
 
 				result = Await.result(response, timeout);
 
@@ -199,9 +192,9 @@ public class JobManagerInfoServlet extends HttpServlet {
 				} else {
 					final int numberOfTaskManagers = (Integer)result;
 
-					final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager,
+					final Future<Object> responseRegisteredSlots = jobmanager.ask(
 							JobManagerMessages.getRequestTotalNumberOfSlots(),
-							new Timeout(timeout));
+							timeout);
 
 					final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
 							timeout);
@@ -221,8 +214,9 @@ public class JobManagerInfoServlet extends HttpServlet {
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
 
-				response = Patterns.ask(jobmanager, new CancelJob(JobID.fromHexString(jobId)),
-						new Timeout(timeout));
+				response = jobmanager.ask(
+						new CancelJob(JobID.fromHexString(jobId)),
+						timeout);
 
 				Await.ready(response, timeout);
 			}
@@ -233,8 +227,9 @@ public class JobManagerInfoServlet extends HttpServlet {
 				writeJsonForVersion(resp.getWriter());
 			}
 			else{
-				response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(),
-						new Timeout(timeout));
+				response = jobmanager.ask(
+						JobManagerMessages.getRequestRunningJobs(),
+						timeout);
 
 				result = Await.result(response, timeout);
 
@@ -471,8 +466,8 @@ public class JobManagerInfoServlet extends HttpServlet {
 			}
 
 			// write accumulators
-			final Future<Object> response = Patterns.ask(jobmanager,
-					new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));
+			final Future<Object> response = jobmanager.ask(
+					new RequestAccumulatorResultsStringified(graph.getJobID()), timeout);
 
 			Object result;
 			try {
@@ -575,9 +570,9 @@ public class JobManagerInfoServlet extends HttpServlet {
 	private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
 
 		try {
-			final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager,
+			final Future<Object> responseArchivedJobs = jobmanager.ask(
 					JobManagerMessages.getRequestRunningJobs(),
-					new Timeout(timeout));
+					timeout);
 
 			Object resultArchivedJobs = null;
 
@@ -615,8 +610,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 
 				wrt.write("],");
 
-				final Future<Object> responseJob = Patterns.ask(jobmanager, new RequestJob(jobId),
-						new Timeout(timeout));
+				final Future<Object> responseJob = jobmanager.ask(new RequestJob(jobId), timeout);
 
 				Object resultJob = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index c3df253..1f2bfe0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -32,10 +32,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 
 import org.apache.flink.runtime.instance.InstanceID;
@@ -67,13 +65,13 @@ public class SetupInfoServlet extends HttpServlet {
 
 
 	final private Configuration configuration;
-	final private ActorRef jobmanager;
+	final private ActorGateway jobmanager;
 	final private FiniteDuration timeout;
 
 
-	public SetupInfoServlet(Configuration conf, ActorRef jm, FiniteDuration timeout) {
+	public SetupInfoServlet(Configuration conf, ActorGateway jobManager, FiniteDuration timeout) {
 		configuration = conf;
-		this.jobmanager = jm;
+		this.jobmanager = jobManager;
 		this.timeout = timeout;
 	}
 
@@ -114,9 +112,9 @@ public class SetupInfoServlet extends HttpServlet {
 
 	private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
 
-		final Future<Object> response = Patterns.ask(jobmanager,
+		final Future<Object> response = jobmanager.ask(
 				JobManagerMessages.getRequestRegisteredTaskManagers(),
-				new Timeout(timeout));
+				timeout);
 
 		Object obj = null;
 
@@ -183,9 +181,9 @@ public class SetupInfoServlet extends HttpServlet {
 		StackTrace message = null;
 		Throwable exception = null;
 
-		final Future<Object> response = Patterns.ask(jobmanager,
+		final Future<Object> response = jobmanager.ask(
 				new RequestStackTrace(instanceID),
-				new Timeout(timeout));
+				timeout);
 
 		try {
 			message = (StackTrace) Await.result(response, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index a414cf6..4383b65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -23,12 +23,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URL;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.server.Server;
@@ -45,7 +45,7 @@ import scala.concurrent.duration.FiniteDuration;
  * This class sets up a web-server that contains a web frontend to display information about running jobs.
  * It instantiates and configures an embedded jetty server.
  */
-public class WebInfoServer {
+public class WebInfoServer implements WebMonitor {
 
 	/** Web root dir in the jar */
 	private static final String WEB_ROOT_DIR = "web-docs-infoserver";
@@ -70,7 +70,7 @@ public class WebInfoServer {
 	 * @throws IOException
 	 *         Thrown, if the server setup failed for an I/O related reason.
 	 */
-	public WebInfoServer(Configuration config, ActorRef jobmanager, ActorRef archive) throws IOException {
+	public WebInfoServer(Configuration config, ActorGateway jobmanager, ActorGateway archive) throws IOException {
 		if (config == null) {
 			throw new IllegalArgumentException("No Configuration has been passed to the web server");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7bf4447..5c0f468 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1128,17 +1128,27 @@ object JobManager {
           "TaskManager_Process_Reaper")
       }
 
-      // start the job manager web frontend
-      if (configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
-        LOG.info("Starting NEW JobManger web frontend")
-        
-        // start the new web frontend. we need to load this dynamically
-        // because it is not in the same project/dependencies
-        startWebRuntimeMonitor(configuration, jobManager, archiver)
-      }
-      else if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
-        LOG.info("Starting JobManger web frontend")
-        val webServer = new WebInfoServer(configuration, jobManager, archiver)
+      if(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+        val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
+        val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
+        val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID())
+
+        // start the job manager web frontend
+        val webServer = if (
+          configuration.getBoolean(
+            ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+            false)) {
+
+          LOG.info("Starting NEW JobManger web frontend")
+          // start the new web frontend. we need to load this dynamically
+          // because it is not in the same project/dependencies
+          startWebRuntimeMonitor(configuration, jobManagerGateway, archiverGateway)
+        }
+        else {
+          LOG.info("Starting JobManger web frontend")
+          new WebInfoServer(configuration, jobManagerGateway, archiverGateway)
+        }
+
         webServer.start()
       }
     }
@@ -1570,46 +1580,37 @@ object JobManager {
    * this method does not throw any exceptions, but only logs them.
    * 
    * @param config The configuration for the runtime monitor.
-   * @param jobManager The JobManager actor.
+   * @param jobManager The JobManager actor gateway.
    * @param archiver The execution graph archive actor.
    */
-  def startWebRuntimeMonitor(config: Configuration,
-                             jobManager: ActorRef,
-                             archiver: ActorRef): Unit = {
+  def startWebRuntimeMonitor(
+      config: Configuration,
+      jobManager: ActorGateway,
+      archiver: ActorGateway)
+    : WebMonitor = {
     // try to load and instantiate the class
-    val monitor: WebMonitor =
-      try {
-        val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
-        val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
-                                                 .asSubclass(classOf[WebMonitor])
-        
-        val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration],
-                                                                      classOf[ActorRef],
-                                                                      classOf[ActorRef])
-        ctor.newInstance(config, jobManager, archiver)
-      }
-      catch {
-        case e: ClassNotFoundException =>
-          LOG.error("Could not load web runtime monitor. " +
-              "Probably reason: flink-runtime-web is not in the classpath")
-          LOG.debug("Caught exception", e)
-          null
-        case e: InvocationTargetException =>
-          LOG.error("WebServer could not be created", e.getTargetException())
-          null
-        case t: Throwable =>
-          LOG.error("Failed to instantiate web runtime monitor.", t)
-          null
-      }
-    
-    if (monitor != null) {
-      try {
-        monitor.start()
-      }
-      catch {
-        case e: Exception => 
-          LOG.error("Failed to start web runtime monitor", e)
-      }
+    try {
+      val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
+      val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
+                                               .asSubclass(classOf[WebMonitor])
+
+      val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration],
+                                                                    classOf[ActorGateway],
+                                                                    classOf[ActorGateway])
+      ctor.newInstance(config, jobManager, archiver)
+    }
+    catch {
+      case e: ClassNotFoundException =>
+        LOG.error("Could not load web runtime monitor. " +
+            "Probably reason: flink-runtime-web is not in the classpath")
+        LOG.debug("Caught exception", e)
+        null
+      case e: InvocationTargetException =>
+        LOG.error("WebServer could not be created", e.getTargetException())
+        null
+      case t: Throwable =>
+        LOG.error("Failed to instantiate web runtime monitor.", t)
+        null
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 6f810fc..7c57233 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -30,10 +30,12 @@ import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobExecutionException, JobClient,
 SerializedJobExecutionResult}
-import org.apache.flink.runtime.instance.ActorGateway
+import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
@@ -74,7 +76,7 @@ abstract class FlinkMiniCluster(
   val configuration = generateConfiguration(userConfiguration)
 
   var jobManagerActorSystem = startJobManagerActorSystem()
-  var jobManagerActor = startJobManager(jobManagerActorSystem)
+  var (jobManagerActor, webMonitor) = startJobManager(jobManagerActorSystem)
 
   val numTaskManagers = configuration.getInteger(
      ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
@@ -99,7 +101,7 @@ abstract class FlinkMiniCluster(
 
   def generateConfiguration(userConfiguration: Configuration): Configuration
 
-  def startJobManager(system: ActorSystem): ActorRef
+  def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor])
 
   def startTaskManager(index: Int, system: ActorSystem): ActorRef
 
@@ -156,6 +158,10 @@ abstract class FlinkMiniCluster(
   }
 
   def shutdown(): Unit = {
+    webMonitor foreach {
+      _.stop()
+    }
+
     val futures = taskManagerActors map {
         gracefulStop(_, timeout)
     }
@@ -183,6 +189,44 @@ abstract class FlinkMiniCluster(
     }
   }
 
+  def startWebServer(
+      config: Configuration,
+      jobManager: ActorRef,
+      archiver: ActorRef)
+    : Option[WebMonitor] = {
+    if(
+      config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false) &&
+      config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+
+      val lookupTimeout = AkkaUtils.getLookupTimeout(config)
+
+      val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
+      val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID())
+
+      // start the job manager web frontend
+      val webServer = if (
+        config.getBoolean(
+          ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+          false)) {
+
+        LOG.info("Starting NEW JobManger web frontend")
+        // start the new web frontend. we need to load this dynamically
+        // because it is not in the same project/dependencies
+        JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway)
+      }
+      else {
+        LOG.info("Starting JobManger web frontend")
+        new WebInfoServer(config, jobManagerGateway, archiverGateway)
+      }
+
+      webServer.start()
+
+      Option(webServer)
+    } else {
+      None
+    }
+  }
+
   def waitForTaskManagersToBeRegistered(): Unit = {
     implicit val executionContext = ExecutionContext.global
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index c056b63..54c457e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -23,12 +23,15 @@ import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
+import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.webmonitor.WebMonitor
 
 import org.slf4j.LoggerFactory
 
@@ -42,9 +45,10 @@ import org.slf4j.LoggerFactory
  * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
  *                          [[ActorSystem]], otherwise false
  */
-class LocalFlinkMiniCluster(userConfiguration: Configuration,
-                            singleActorSystem: Boolean,
-                            streamingMode: StreamingMode)
+class LocalFlinkMiniCluster(
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean,
+    streamingMode: StreamingMode)
   extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
 
   
@@ -74,23 +78,14 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
     config
   }
 
-  override def startJobManager(system: ActorSystem): ActorRef = {
+  override def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor]) = {
     val config = configuration.clone()
        
     val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode)
-    
-    if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
-      if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
-        // new web frontend
-        JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archiver)
-      }
-      else {
-        // old web frontend
-        val webServer = new WebInfoServer(configuration, jobManager, archiver)
-        webServer.start()
-      }
-    }
-    jobManager
+
+    val webMonitorOption = startWebServer(config, jobManager, archiver)
+
+    (jobManager, webMonitorOption)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
@@ -125,13 +120,15 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
       None
     }
     
-    TaskManager.startTaskManagerComponentsAndActor(config, system,
-                                                   hostname, // network interface to bind to
-                                                   Some(taskManagerActorName), // actor name
-                                                   jobManagerPath, // job manager akka URL
-                                                   localExecution, // start network stack?
-                                                   streamingMode,
-                                                   classOf[TaskManager])
+    TaskManager.startTaskManagerComponentsAndActor(
+      config,
+      system,
+      hostname, // network interface to bind to
+      Some(taskManagerActorName), // actor name
+      jobManagerPath, // job manager akka URL
+      localExecution, // start network stack?
+      streamingMode,
+      classOf[TaskManager])
   }
 
   def getJobClientActorSystem: ActorSystem = jobClientActorSystem

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f974946..0ec1040 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -171,7 +171,7 @@ class TaskManager(
 
   protected var leaderSessionID: Option[UUID] = None
 
-  private var currentRegistrationSessionID: UUID = UUID.randomUUID()
+  private val currentRegistrationSessionID: UUID = UUID.randomUUID()
 
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index ce0ef8d..f5a506d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.webmonitor.WebMonitor
 
 /**
  * Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors with testing support
@@ -67,7 +68,7 @@ class TestingCluster(userConfiguration: Configuration,
     cfg
   }
 
-  override def startJobManager(actorSystem: ActorSystem): ActorRef = {
+  override def startJobManager(actorSystem: ActorSystem): (ActorRef, Option[WebMonitor]) = {
 
     val (executionContext,
       instanceManager,
@@ -103,7 +104,7 @@ class TestingCluster(userConfiguration: Configuration,
       jobManagerProps
     }
 
-    actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME)
+    (actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME), None)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem) = {
@@ -116,12 +117,14 @@ class TestingCluster(userConfiguration: Configuration,
       None
     }
     
-    TaskManager.startTaskManagerComponentsAndActor(configuration, system,
-                                                   hostname,
-                                                   Some(tmActorName),
-                                                   jobManagerPath,
-                                                   numTaskManagers == 1,
-                                                   streamingMode,
-                                                   classOf[TestingTaskManager])
+    TaskManager.startTaskManagerComponentsAndActor(
+      configuration,
+      system,
+      hostname,
+      Some(tmActorName),
+      jobManagerPath,
+      numTaskManagers == 1,
+      streamingMode,
+      classOf[TestingTaskManager])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index cdf3960..e83c7a6 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -22,12 +22,15 @@ import akka.actor.{Props, ActorRef, ActorSystem}
 import akka.pattern.Patterns._
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManager,
 TestingMemoryArchivist, TestingTaskManager}
+import org.apache.flink.runtime.webmonitor.WebMonitor
 
 import scala.concurrent.Await
 
@@ -40,9 +43,10 @@ import scala.concurrent.Await
  * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
  *                          same [[ActorSystem]], otherwise false.
  */
-class ForkableFlinkMiniCluster(userConfiguration: Configuration,
-                               singleActorSystem: Boolean,
-                               streamingMode: StreamingMode)
+class ForkableFlinkMiniCluster(
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean,
+    streamingMode: StreamingMode)
   extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
   
 
@@ -78,7 +82,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
     super.generateConfiguration(config)
   }
 
-  override def startJobManager(actorSystem: ActorSystem): ActorRef = {
+  override def startJobManager(actorSystem: ActorSystem): (ActorRef, Option[WebMonitor]) = {
 
     val (executionContext,
       instanceManager,
@@ -95,7 +99,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
         archiveCount)
       with TestingMemoryArchivist)
 
-    val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
+    val archiver = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
     
     val jobManagerProps = Props(
       new JobManager(
@@ -104,7 +108,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
         instanceManager,
         scheduler,
         libraryCacheManager,
-        archive,
+        archiver,
         executionRetries,
         delayBetweenRetries,
         timeout,
@@ -113,21 +117,9 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
 
     val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
 
-    if (userConfiguration.getBoolean(
-      ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false))
-    {
-      if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
-        // new web frontend
-        JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archive)
-      }
-      else {
-        // old web frontend
-        val webServer = new WebInfoServer(configuration, jobManager, archive)
-        webServer.start()
-      }
-    }
+    val webMonitorOption = startWebServer(configuration, jobManager, archiver)
 
-    jobManager
+    (jobManager, webMonitorOption)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
@@ -163,11 +155,18 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
     val stopped = gracefulStop(jobManagerActor, TestingUtils.TESTING_DURATION)
     Await.result(stopped, TestingUtils.TESTING_DURATION)
 
+    webMonitor foreach {
+      _.stop()
+    }
+
     jobManagerActorSystem.shutdown()
     jobManagerActorSystem.awaitTermination()
 
     jobManagerActorSystem = startJobManagerActorSystem()
-    jobManagerActor = startJobManager(jobManagerActorSystem)
+    val (newJobManagerActor, newWebMonitor) = startJobManager(jobManagerActorSystem)
+
+    jobManagerActor = newJobManagerActor
+    webMonitor = newWebMonitor
   }
 
   def restartTaskManager(index: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index c497a90..9e0c976 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -26,9 +26,11 @@ import org.apache.flink.client.CliFrontend
 import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.apache.flink.yarn.Messages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -68,7 +70,7 @@ object ApplicationMaster {
       override def run(): Object = {
 
         var actorSystem: ActorSystem = null
-        var webserver: WebInfoServer = null
+        var webserver: WebMonitor = null
 
         try {
           val conf = new YarnConfiguration()
@@ -99,25 +101,44 @@ object ApplicationMaster {
           val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
           val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
 
-          val (config: Configuration,
-               system: ActorSystem,
-               jobManager: ActorRef,
-               archiver: ActorRef) = startJobManager(currDir, ownHostname,
-                                                     dynamicPropertiesEncodedString,
-                                                     streamingMode)
+          val config = createConfiguration(currDir, dynamicPropertiesEncodedString)
+
+          val (
+            system: ActorSystem,
+            jobManager: ActorRef,
+            archiver: ActorRef) = startJobManager(
+              config,
+              ownHostname,
+              streamingMode)
+
           actorSystem = system
           val extActor = system.asInstanceOf[ExtendedActorSystem]
           val jobManagerPort = extActor.provider.getDefaultAddress.port.get
 
-          // start the web info server
           if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
+            // start the web info server
+            val lookupTimeout = AkkaUtils.getLookupTimeout(config)
+            val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
+            val archiverGateway = new AkkaActorGateway(
+              archiver,
+              jobManagerGateway.leaderSessionID())
+
             LOG.info("Starting Job Manger web frontend.")
             config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
             config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
             // set JobManager host/port for web interface.
             config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ownHostname)
             config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort)
-            webserver = new WebInfoServer(config, jobManager, archiver)
+
+            webserver = if(
+              config.getBoolean(
+                ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+                false)) {
+              JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway)
+            } else {
+              new WebInfoServer(config, jobManagerGateway, archiverGateway)
+            }
+
             webserver.start()
           }
 
@@ -160,11 +181,17 @@ object ApplicationMaster {
 
   }
 
-  def generateConfigurationFile(fileName: String, currDir: String, ownHostname: String,
-                               jobManagerPort: Int,
-                               jobManagerWebPort: Int, logDirs: String, slots: Int,
-                               taskManagerCount: Int, dynamicPropertiesEncodedString: String)
-  : Unit = {
+  def generateConfigurationFile(
+      fileName: String,
+      currDir: String,
+      ownHostname: String,
+      jobManagerPort: Int,
+      jobManagerWebPort: Int,
+      logDirs: String,
+      slots: Int,
+      taskManagerCount: Int,
+      dynamicPropertiesEncodedString: String)
+    : Unit = {
     LOG.info("Generate configuration file for application master.")
     val output = new PrintWriter(new BufferedWriter(
       new FileWriter(fileName))
@@ -208,26 +235,13 @@ object ApplicationMaster {
    *
    * @return (Configuration, JobManager ActorSystem, JobManager ActorRef, Archiver ActorRef)
    */
-  def startJobManager(currDir: String,
-                      hostname: String,
-                      dynamicPropertiesEncodedString: String,
-                      streamingMode: StreamingMode):
-    (Configuration, ActorSystem, ActorRef, ActorRef) = {
+  def startJobManager(
+      configuration: Configuration,
+      hostname: String,
+      streamingMode: StreamingMode)
+    : (ActorSystem, ActorRef, ActorRef) = {
 
     LOG.info("Starting JobManager for YARN")
-    LOG.info(s"Loading config from: $currDir.")
-
-    GlobalConfiguration.loadConfiguration(currDir)
-    val configuration = GlobalConfiguration.getConfiguration()
-
-    configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir)
-
-    // add dynamic properties to JobManager configuration.
-    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
-    import scala.collection.JavaConverters._
-    for(property <- dynamicProperties.asScala){
-      configuration.setString(property.f0, property.f1)
-    }
 
     // set port to 0 to let Akka automatically determine the port.
     LOG.debug("Starting JobManager actor system")
@@ -265,7 +279,25 @@ object ApplicationMaster {
     LOG.debug("Starting JobManager actor")
     val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)
 
-    (configuration, jobManagerSystem, jobManager, archiver)
+    (jobManagerSystem, jobManager, archiver)
+  }
+
+  def createConfiguration(curDir: String, dynamicPropertiesEncodedString: String): Configuration = {
+    LOG.info(s"Loading config from: $curDir.")
+
+    GlobalConfiguration.loadConfiguration(curDir)
+    val configuration = GlobalConfiguration.getConfiguration()
+
+    configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, curDir)
+
+    // add dynamic properties to JobManager configuration.
+    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+    import scala.collection.JavaConverters._
+    for(property <- dynamicProperties.asScala){
+      configuration.setString(property.f0, property.f1)
+    }
+
+    configuration
   }
 
 


Mime
View raw message