flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-1508] [runtime] Removes AkkaUtil.ask and replaces respective calls with explicit future handling.
Date Sat, 14 Feb 2015 17:10:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master 69cba1f15 -> 589b539c5


[FLINK-1508] [runtime] Removes AkkaUtil.ask and replaces respective calls with explicit future handling.

Removes blocking calls for ActorRef retrieval in actors.

This closes #384.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1583dcfc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1583dcfc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1583dcfc

Branch: refs/heads/master
Commit: 1583dcfc7bdcd31ba5e951835fc1d4ca60a23948
Parents: 69cba1f
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Feb 10 12:21:33 2015 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sat Feb 14 16:16:34 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 151 +++---
 .../flink/client/web/JobsInfoServlet.java       |  93 ++--
 .../jobmanager/web/JobManagerInfoServlet.java   | 458 ++++++++++++-------
 .../jobmanager/web/SetupInfoServlet.java        |  92 ++--
 .../taskmanager/TaskInputSplitProvider.java     |  30 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  23 +-
 .../apache/flink/runtime/client/JobClient.scala |  36 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  28 +-
 .../runtime/messages/RegistrationMessages.scala |   9 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  33 +-
 .../jobmanager/JobManagerStartupTest.java       |  10 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   6 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  18 +-
 .../TaskManagerRegistrationITCase.scala         |   2 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java |  46 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  44 +-
 .../scala/org/apache/flink/yarn/Messages.scala  |   7 +
 17 files changed, 686 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 2f905df..dc14a2f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -37,6 +37,8 @@ import java.util.Properties;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -57,6 +59,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -64,11 +67,12 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.util.StringUtils;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -517,67 +521,83 @@ public class CliFrontend {
 				return 1;
 			}
 
-			Iterable<ExecutionGraph> jobs = AkkaUtils.<RunningJobs>ask(jobManager,
-					RequestRunningJobs$.MODULE$, getAkkaTimeout()).asJavaIterable();
+			final Future<Object> response = Patterns.ask(jobManager,
+					JobManagerMessages.getRequestRunningJobs(), new Timeout(getAkkaTimeout()));
 
-			ArrayList<ExecutionGraph> runningJobs = null;
-			ArrayList<ExecutionGraph> scheduledJobs = null;
-			if (running) {
-				runningJobs = new ArrayList<ExecutionGraph>();
-			}
-			if (scheduled) {
-				scheduledJobs = new ArrayList<ExecutionGraph>();
+			Object result = null;
+
+			try{
+				result = Await.result(response, getAkkaTimeout());
+			} catch (Exception exception) {
+				throw new IOException("Could not retrieve running jobs from job manager.",
+						exception);
 			}
-			
-			for (ExecutionGraph rj : jobs) {
-				
-				if (running && rj.getState().equals(JobStatus.RUNNING)) {
-					runningJobs.add(rj);
+
+			if(!(result instanceof RunningJobs)){
+				throw new RuntimeException("ReqeustRunningJobs requires a response of type " +
+						"RunningJobs. Instead the response is of type " + result.getClass() + ".");
+			} else {
+				Iterable<ExecutionGraph> jobs = ((RunningJobs) result).asJavaIterable();
+
+				ArrayList<ExecutionGraph> runningJobs = null;
+				ArrayList<ExecutionGraph> scheduledJobs = null;
+				if (running) {
+					runningJobs = new ArrayList<ExecutionGraph>();
 				}
-				if (scheduled && rj.getState().equals(JobStatus.CREATED)) {
-					scheduledJobs.add(rj);
+				if (scheduled) {
+					scheduledJobs = new ArrayList<ExecutionGraph>();
 				}
-			}
-			
-			SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
-			Comparator<ExecutionGraph> njec = new Comparator<ExecutionGraph>(){
-				
-				@Override
-				public int compare(ExecutionGraph o1, ExecutionGraph o2) {
-					return (int)(o1.getStatusTimestamp(o1.getState())-o2.getStatusTimestamp(o2
-							.getState()));
+
+				for (ExecutionGraph rj : jobs) {
+
+					if (running && rj.getState().equals(JobStatus.RUNNING)) {
+						runningJobs.add(rj);
+					}
+					if (scheduled && rj.getState().equals(JobStatus.CREATED)) {
+						scheduledJobs.add(rj);
+					}
 				}
-			};
-			
-			if (running) {
-				if(runningJobs.size() == 0) {
-					System.out.println("No running jobs.");
-				} else {
-					Collections.sort(runningJobs, njec);
-					
-					System.out.println("------------------------ Running Jobs ------------------------");
-					for(ExecutionGraph rj : runningJobs) {
-						System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
-								+" : "+rj.getJobID().toString()+" : "+rj.getJobName());
+
+				SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
+				Comparator<ExecutionGraph> njec = new Comparator<ExecutionGraph>(){
+
+					@Override
+					public int compare(ExecutionGraph o1, ExecutionGraph o2) {
+						return (int)(o1.getStatusTimestamp(o1.getState())-o2.getStatusTimestamp(o2
+								.getState()));
+					}
+				};
+
+				if (running) {
+					if(runningJobs.size() == 0) {
+						System.out.println("No running jobs.");
+					} else {
+						Collections.sort(runningJobs, njec);
+
+						System.out.println("------------------------ Running Jobs ------------------------");
+						for(ExecutionGraph rj : runningJobs) {
+							System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
+									+" : "+rj.getJobID().toString()+" : "+rj.getJobName());
+						}
+						System.out.println("--------------------------------------------------------------");
 					}
-					System.out.println("--------------------------------------------------------------");
 				}
-			}
-			if (scheduled) {
-				if(scheduledJobs.size() == 0) {
-					System.out.println("No scheduled jobs.");
-				} else {
-					Collections.sort(scheduledJobs, njec);
-					
-					System.out.println("----------------------- Scheduled Jobs -----------------------");
-					for(ExecutionGraph rj : scheduledJobs) {
-						System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
-								+" : "+rj.getJobID().toString()+" : "+rj.getJobName());
+				if (scheduled) {
+					if(scheduledJobs.size() == 0) {
+						System.out.println("No scheduled jobs.");
+					} else {
+						Collections.sort(scheduledJobs, njec);
+
+						System.out.println("----------------------- Scheduled Jobs -----------------------");
+						for(ExecutionGraph rj : scheduledJobs) {
+							System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
+									+" : "+rj.getJobID().toString()+" : "+rj.getJobName());
+						}
+						System.out.println("--------------------------------------------------------------");
 					}
-					System.out.println("--------------------------------------------------------------");
 				}
+				return 0;
 			}
-			return 0;
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -637,7 +657,16 @@ public class CliFrontend {
 				return 1;
 			}
 
-			AkkaUtils.ask(jobManager, new CancelJob(jobId), getAkkaTimeout());
+			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId),
+					new Timeout(getAkkaTimeout()));
+
+			try {
+				Await.ready(response, getAkkaTimeout());
+			} catch (Exception exception) {
+				throw new IOException("Canceling the job with job ID " + jobId + " failed.",
+						exception);
+			}
+
 			return 0;
 		}
 		catch (Throwable t) {
@@ -753,9 +782,19 @@ public class CliFrontend {
 			return null;
 		}
 
-		return JobManager.getJobManager(RemoteExecutor.getInetFromHostport(jobManagerAddressStr),
-				ActorSystem.create("CliFrontendActorSystem",
-						AkkaUtils.getDefaultAkkaConfig()),getAkkaTimeout());
+		InetSocketAddress address = RemoteExecutor.getInetFromHostport(jobManagerAddressStr);
+
+		Future<ActorRef> jobManagerFuture = JobManager.getJobManager(
+				address,
+				ActorSystem.create("CliFrontendActorSystem", AkkaUtils.getDefaultAkkaConfig()),
+				getAkkaTimeout());
+
+		try{
+			return Await.result(jobManagerFuture, getAkkaTimeout());
+		} catch (Exception exception) {
+			throw new IOException("Could not find job manager at address " +
+					JobManager.getRemoteAkkaURL(address) + ".");
+		}
 	}
 	
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
index f83e9b4..b638326 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
@@ -31,14 +31,18 @@ import javax.servlet.http.HttpServletResponse;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import scala.concurrent.Await;
 import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.Future;
 
 
 public class JobsInfoServlet extends HttpServlet {
@@ -54,12 +58,29 @@ public class JobsInfoServlet extends HttpServlet {
 	private final ActorSystem system;
 
 	private final FiniteDuration timeout;
+
+	private final ActorRef jobmanager;
 	
 	public JobsInfoServlet(Configuration flinkConfig) {
 		this.config = flinkConfig;
 		system = ActorSystem.create("JobsInfoServletActorSystem",
 				AkkaUtils.getDefaultAkkaConfig());
 		this.timeout = AkkaUtils.getTimeout(flinkConfig);
+
+		String jmHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+		int jmPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+		InetSocketAddress address = new InetSocketAddress(jmHost, jmPort);
+
+		Future<ActorRef> jobManagerFuture = JobManager.getJobManager(address, system, timeout);
+
+		try {
+			this.jobmanager = Await.result(jobManagerFuture, timeout);
+		} catch (Exception ex) {
+			throw new RuntimeException("Could not find job manager at specified address " +
+					JobManager.getRemoteAkkaURL(address) + ".");
+		}
 	}
 
 	@Override
@@ -67,38 +88,48 @@ public class JobsInfoServlet extends HttpServlet {
 		//resp.setContentType("application/json");
 		
 		try {
-			String jmHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-			int jmPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-					ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-
-			ActorRef jm = JobManager.getJobManager(new InetSocketAddress(jmHost, jmPort), system,
-					timeout);
-
-			Iterator<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jm,
-					RequestRunningJobs$.MODULE$, timeout).asJavaIterable().iterator();
-
-
-			resp.setStatus(HttpServletResponse.SC_OK);
-			PrintWriter wrt = resp.getWriter();
-			wrt.write("[");
-			while(graphs.hasNext()){
-				ExecutionGraph graph = graphs.next();
-				//Serialize job to json
-				wrt.write("{");
-				wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-				if(graph.getJobName() != null) {
-					wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-				}
-				wrt.write("\"status\": \""+ graph.getState() + "\",");
-				wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState()));
-				wrt.write("}");
-				//Write seperator between json objects
-				if(graphs.hasNext()) {
-					wrt.write(",");
+			final Future<Object> response = Patterns.ask(jobmanager,
+					JobManagerMessages.getRequestRunningJobs(),
+					new Timeout(timeout));
+
+			Object result = null;
+
+			try {
+				result = Await.result(response, timeout);
+			} catch (Exception exception) {
+				throw new IOException("Could not retrieve the running jobs from the job manager.",
+						exception);
+			}
+
+			if(!(result instanceof RunningJobs)) {
+				throw new RuntimeException("ReqeustRunningJobs requires a response of type " +
+						"RunningJob. Instead the response is of type " + result.getClass() + ".");
+			} else {
+
+				final Iterator<ExecutionGraph> graphs = ((RunningJobs) result).
+						asJavaIterable().iterator();
+
+				resp.setStatus(HttpServletResponse.SC_OK);
+				PrintWriter wrt = resp.getWriter();
+				wrt.write("[");
+				while(graphs.hasNext()){
+					ExecutionGraph graph = graphs.next();
+					//Serialize job to json
+					wrt.write("{");
+					wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
+					if(graph.getJobName() != null) {
+						wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
+					}
+					wrt.write("\"status\": \""+ graph.getState() + "\",");
+					wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState()));
+					wrt.write("}");
+					//Write seperator between json objects
+					if(graphs.hasNext()) {
+						wrt.write(",");
+					}
 				}
+				wrt.write("]");
 			}
-			wrt.write("]");
-			
 		} catch (Throwable t) {
 			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
 			resp.getWriter().print(t.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 983738e..961a464 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -35,7 +35,8 @@ import javax.servlet.http.HttpServletResponse;
 
 import akka.actor.ActorRef;
 
-import org.apache.flink.runtime.akka.AkkaUtils;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
 import org.apache.flink.runtime.messages.ArchiveMessages;
@@ -43,7 +44,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse;
 import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob;
@@ -64,6 +64,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 import org.eclipse.jetty.io.EofException;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 public class JobManagerInfoServlet extends HttpServlet {
@@ -86,60 +88,121 @@ public class JobManagerInfoServlet extends HttpServlet {
 	
 	
 	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException,
+			IOException {
 			
 		resp.setStatus(HttpServletResponse.SC_OK);
 		resp.setContentType("application/json");
+
+		Future<Object> response;
+		Object result;
 		
 		try {
 			if("archive".equals(req.getParameter("get"))) {
-				List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils
-						.<ArchivedJobs>ask(archive, ArchiveMessages.getRequestArchivedJobs(), timeout)
-						.asJavaCollection());
+				response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(),
+						new Timeout(timeout));
+
+				result = Await.result(response, timeout);
 
-				writeJsonForArchive(resp.getWriter(), archivedJobs);
+				if(!(result instanceof ArchivedJobs)) {
+					throw new RuntimeException("RequestArchiveJobs requires a response of type " +
+							"ArchivedJobs. Instead the response is of type " + result.getClass() +
+							".");
+				} else {
+					final List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(
+							((ArchivedJobs) result).asJavaCollection());
+
+					writeJsonForArchive(resp.getWriter(), archivedJobs);
+				}
 			}
 			else if("job".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
-				JobResponse response = AkkaUtils.ask(archive,
-						new RequestJob(JobID.fromHexString(jobId)), timeout);
 
-				if(response instanceof JobFound){
-					ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
-					writeJsonForArchivedJob(resp.getWriter(), archivedJob);
+				response = Patterns.ask(archive, new RequestJob(JobID.fromHexString(jobId)),
+						new Timeout(timeout));
+
+				result = Await.result(response, timeout);
+
+				if(!(result instanceof JobResponse)){
+					throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+							"Instead the response is of type " + result.getClass());
+				}else {
+					final JobResponse jobResponse = (JobResponse) response;
+
+					if(jobResponse instanceof JobFound){
+						ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
+						writeJsonForArchivedJob(resp.getWriter(), archivedJob);
 				} else {
-					LOG.warn("DoGet:job: Could not find job for job ID " + jobId);
+						LOG.warn("DoGet:job: Could not find job for job ID " + jobId);
+					}
 				}
 			}
 			else if("groupvertex".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
 				String groupvertexId = req.getParameter("groupvertex");
 
-				JobResponse response = AkkaUtils.ask(archive,
-						new RequestJob(JobID.fromHexString(jobId)), timeout);
+				response = Patterns.ask(archive, new RequestJob(JobID.fromHexString(jobId)),
+						new Timeout(timeout));
 
-				if (response instanceof JobFound && groupvertexId != null) {
-					ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
+				result = Await.result(response, timeout);
 
-					writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob,
-							JobVertexID.fromHexString(groupvertexId));
+				if(!(result instanceof JobResponse)){
+					throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+							"Instead the response is of type " + result.getClass());
+				}else {
+					final JobResponse jobResponse = (JobResponse) result;
+
+					if(jobResponse instanceof JobFound && groupvertexId != null){
+						ExecutionGraph archivedJob = ((JobFound)jobResponse).executionGraph();
+
+						writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob,
+								JobVertexID.fromHexString(groupvertexId));
 				} else {
-					LOG.warn("DoGet:groupvertex: Could not find job for job ID " + jobId);
+						LOG.warn("DoGet:groupvertex: Could not find job for job ID " + jobId);
+					}
 				}
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
-				int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(), timeout);
-				int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
-						JobManagerMessages.getRequestTotalNumberOfSlots(), timeout);
 
-				resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
-						"\"slots\": "+numberOfRegisteredSlots+"}");
+				response = Patterns.ask(jobmanager,
+						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+						new Timeout(timeout));
+
+				result = Await.result(response, timeout);
+
+				if(!(result instanceof Integer)) {
+					throw new RuntimeException("RequestNumberRegisteredTaskManager requires a " +
+							"response of type Integer. Instead the response is of type " +
+							result.getClass() + ".");
+				} else {
+					final int numberOfTaskManagers = (Integer)result;
+
+					final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager,
+							JobManagerMessages.getRequestTotalNumberOfSlots(),
+							new Timeout(timeout));
+
+					final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
+							timeout);
+
+					if(!(resultRegisteredSlots instanceof Integer)) {
+						throw new RuntimeException("RequestTotalNumberOfSlots requires a response of " +
+								"type Integer. Instaed the response of type " +
+								resultRegisteredSlots.getClass() + ".");
+					} else {
+						final int numberOfRegisteredSlots = (Integer) resultRegisteredSlots;
+
+						resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
+								"\"slots\": "+numberOfRegisteredSlots+"}");
+					}
+				}
 			}
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
-				AkkaUtils.<CancellationResponse>ask(jobmanager,
-						new CancelJob(JobID.fromHexString(jobId)), timeout);
+
+				response = Patterns.ask(jobmanager, new CancelJob(JobID.fromHexString(jobId)),
+						new Timeout(timeout));
+
+				Await.ready(response, timeout);
 			}
 			else if("updates".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
@@ -148,9 +211,20 @@ public class JobManagerInfoServlet extends HttpServlet {
 				writeJsonForVersion(resp.getWriter());
 			}
 			else{
-				Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask
-						(jobmanager, JobManagerMessages.getRequestRunningJobs(), timeout).asJavaIterable();
-				writeJsonForJobs(resp.getWriter(), runningJobs);
+				response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(),
+						new Timeout(timeout));
+
+				result = Await.result(response, timeout);
+
+				if(!(result instanceof RunningJobs)){
+					throw new RuntimeException("RequestRunningJobs requires a response of type " +
+							"RunningJobs. Instead the response of type " + result.getClass() + ".");
+				} else {
+					final Iterable<ExecutionGraph> runningJobs =
+							((RunningJobs) result).asJavaIterable();
+
+					writeJsonForJobs(resp.getWriter(), runningJobs);
+				}
 			}
 			
 		} catch (Exception e) {
@@ -273,19 +347,19 @@ public class JobManagerInfoServlet extends HttpServlet {
 	private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph graph) {
 		
 		try {
-		
+
 			wrt.write("[");
-		
+
 			//Serialize job to json
 			wrt.write("{");
 			wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-			wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-			wrt.write("\"status\": \""+ graph.getState() + "\",");
-			wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ",");
-			wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
-			wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
-			wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ",");
-			wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
+			wrt.write("\"jobname\": \"" + graph.getJobName() + "\",");
+			wrt.write("\"status\": \"" + graph.getState() + "\",");
+			wrt.write("\"SCHEDULED\": " + graph.getStatusTimestamp(JobStatus.CREATED) + ",");
+			wrt.write("\"RUNNING\": " + graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
+			wrt.write("\"FINISHED\": " + graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
+			wrt.write("\"FAILED\": " + graph.getStatusTimestamp(JobStatus.FAILED) + ",");
+			wrt.write("\"CANCELED\": " + graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
 
 			if (graph.getState() == JobStatus.FAILED) {
 				wrt.write("\"failednodes\": [");
@@ -315,101 +389,118 @@ public class JobManagerInfoServlet extends HttpServlet {
 			boolean first = true;
 			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
 				//Write seperator between json objects
-				if(first) {
+				if (first) {
 					first = false;
 				} else {
-					wrt.write(","); }
-				
+					wrt.write(",");
+				}
+
 				wrt.write(JsonFactory.toJson(groupVertex));
-				
+
 			}
 			wrt.write("],");
-			
+
 			// write accumulators
-			AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager,
-					new RequestAccumulatorResults(graph.getJobID()), timeout);
-
-			if (response instanceof AccumulatorResultsFound) {
-				Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap();
-
-				wrt.write("\n\"accumulators\": [");
-				int i = 0;
-				for( Entry<String, Object> accumulator : accMap.entrySet()) {
-					wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\","
-							+ " \"value\": \""+accumulator.getValue().toString()+"\"}\n");
-					if(++i < accMap.size()) {
-						wrt.write(",");
-					}
-				}
-				wrt.write("],\n");
+			final Future<Object> response = Patterns.ask(jobmanager,
+					new RequestAccumulatorResults(graph.getJobID()), new Timeout(timeout));
 
-				wrt.write("\"groupverticetimes\": {");
-				first = true;
-				for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+			Object result = null;
 
-					if(first) {
-						first = false;
-					} else {
-						wrt.write(","); }
+			try {
+				result = Await.result(response, timeout);
+			} catch (Exception ex) {
+				throw new IOException("Could not retrieve the accumulator results from the " +
+						"job manager.", ex);
+			}
 
-					// Calculate start and end time for groupvertex
-					long started = Long.MAX_VALUE;
-					long ended = 0;
+			if (!(result instanceof AccumulatorResultsResponse)) {
+				throw new RuntimeException("RequestAccumulatorResults requires a response of type " +
+						"AccumulatorResultsReponse. Instead the response is of type " +
+						result.getClass() + ".");
+			} else {
+				final AccumulatorResultsResponse accumulatorResponse =
+						(AccumulatorResultsResponse) result;
+
+				if (accumulatorResponse instanceof AccumulatorResultsFound) {
+					Map<String, Object> accMap = ((AccumulatorResultsFound) accumulatorResponse).
+							asJavaMap();
+
+					wrt.write("\n\"accumulators\": [");
+					int i = 0;
+					for (Entry<String, Object> accumulator : accMap.entrySet()) {
+						wrt.write("{ \"name\": \"" + accumulator.getKey() + " (" + accumulator.getValue().getClass().getName() + ")\","
+								+ " \"value\": \"" + accumulator.getValue().toString() + "\"}\n");
+						if (++i < accMap.size()) {
+							wrt.write(",");
+						}
+					}
+					wrt.write("],\n");
 
-					// Take earliest running state and latest endstate of groupmembers
-					for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+					wrt.write("\"groupverticetimes\": {");
+					first = true;
+					for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
 
-						long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
-						if (running != 0 && running < started) {
-							started = running;
+						if (first) {
+							first = false;
+						} else {
+							wrt.write(",");
 						}
 
-						long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
-						long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
-						long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
+						// Calculate start and end time for groupvertex
+						long started = Long.MAX_VALUE;
+						long ended = 0;
 
-						if(finished != 0 && finished > ended) {
-							ended = finished;
-						}
+						// Take earliest running state and latest endstate of groupmembers
+						for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
 
-						if(canceled != 0 && canceled > ended) {
-							ended = canceled;
-						}
+							long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
+							if (running != 0 && running < started) {
+								started = running;
+							}
+
+							long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
+							long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
+							long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
+
+							if (finished != 0 && finished > ended) {
+								ended = finished;
+							}
+
+							if (canceled != 0 && canceled > ended) {
+								ended = canceled;
+							}
+
+							if (failed != 0 && failed > ended) {
+								ended = failed;
+							}
 
-						if(failed != 0 && failed > ended) {
-							ended = failed;
 						}
 
+						wrt.write("\"" + groupVertex.getJobVertexId() + "\": {");
+						wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
+						wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
+						wrt.write("\"STARTED\": " + started + ",");
+						wrt.write("\"ENDED\": " + ended);
+						wrt.write("}");
+
 					}
+			} else {
+					LOG.warn("Could not find accumulator results for job ID " + graph.getJobID());
+				}
 
-					wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
-					wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
-					wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
-					wrt.write("\"STARTED\": "+ started + ",");
-					wrt.write("\"ENDED\": "+ ended);
-					wrt.write("}");
+				wrt.write("}");
 
-				}
-			} else {
-				LOG.warn("Could not find accumulator results for job ID " + graph.getJobID());
-			}
+				wrt.write("}");
 
-			wrt.write("}");
-			
-			wrt.write("}");
-			
-			
-		wrt.write("]");
-		
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client	
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
-		} 
-		
+
+				wrt.write("]");
+			}
+		} catch (Exception ex) { // Connection closed by client
+			LOG.info("Info server for jobmanager: Failed to write json for archived jobs, " +
+					"because {}.", StringUtils.stringifyException(ex));
+		}
 	}
-	
-	
+
 	/**
 	 * Writes all updates (events) for a given job since a given time
 	 * 
@@ -419,80 +510,115 @@ public class JobManagerInfoServlet extends HttpServlet {
 	private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
 		
 		try {
-			Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager,
-					JobManagerMessages.getRequestRunningJobs(), timeout).asJavaIterable();
-			
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + jobId + "\",");
-			wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\",");
-			wrt.write("\"recentjobs\": [");
+			final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager,
+					JobManagerMessages.getRequestRunningJobs(),
+					new Timeout(timeout));
 
-			boolean first = true;
-
-			for(ExecutionGraph g : graphs){
-				if (first) {
-					first = false;
-				} else {
-					wrt.write(",");
-				}
+			Object resultArchivedJobs = null;
 
-				wrt.write("\"" + g.getJobID() + "\"");
+			try{
+				resultArchivedJobs = Await.result(responseArchivedJobs, timeout);
+			} catch (Exception ex) {
+				throw new IOException("Could not retrieve archived jobs from the job manager.", ex);
 			}
 
-			wrt.write("],");
-
-			JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId), timeout);
+			if(!(resultArchivedJobs instanceof RunningJobs)){
+				throw new RuntimeException("RequestArchivedJobs requires a response of type " +
+						"RunningJobs. Instead the response is of type " +
+						resultArchivedJobs.getClass() + ".");
+			} else {
+				final Iterable<ExecutionGraph> graphs = ((RunningJobs)resultArchivedJobs).
+						asJavaIterable();
 
-			if(response instanceof JobFound){
-				ExecutionGraph graph = ((JobFound)response).executionGraph();
+				//Serialize job to json
+				wrt.write("{");
+				wrt.write("\"jobid\": \"" + jobId + "\",");
+				wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\",");
+				wrt.write("\"recentjobs\": [");
 
-				wrt.write("\"vertexevents\": [");
+				boolean first = true;
 
-				first = true;
-				for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
-					if (first) {
+				for(ExecutionGraph g : graphs){
+				if (first) {
 						first = false;
-					} else {
+				} else {
 						wrt.write(",");
 					}
 
-					wrt.write("{");
-					wrt.write("\"vertexid\": \"" + ev.getCurrentExecutionAttempt().getAttemptId()
-							+ "\",");
-					wrt.write("\"newstate\": \"" + ev.getExecutionState() + "\",");
-					wrt.write("\"timestamp\": \"" + ev.getStateTimestamp(ev.getExecutionState())
-							+ "\"");
-					wrt.write("}");
+					wrt.write("\"" + g.getJobID() + "\"");
 				}
 
 				wrt.write("],");
 
-				wrt.write("\"jobevents\": [");
+				final Future<Object> responseJob = Patterns.ask(jobmanager, new RequestJob(jobId),
+						new Timeout(timeout));
 
-				wrt.write("{");
-				wrt.write("\"newstate\": \"" + graph.getState() + "\",");
-				wrt.write("\"timestamp\": \"" + graph.getStatusTimestamp(graph.getState()) + "\"");
-				wrt.write("}");
+				Object resultJob = null;
 
-				wrt.write("]");
+				try{
+					resultJob = Await.result(responseJob, timeout);
+				} catch (Exception ex){
+					throw new IOException("Could not retrieve the job with jobID " + jobId +
+						"from the job manager.", ex);
+				}
 
-				wrt.write("}");
+				if(!(resultJob instanceof JobResponse)) {
+					throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+							"Instead the response is of type " + resultJob.getClass() + ".");
+				} else {
+					final JobResponse response = (JobResponse) resultJob;
+
+					if(response instanceof JobFound){
+						ExecutionGraph graph = ((JobFound)response).executionGraph();
+
+						wrt.write("\"vertexevents\": [");
+
+						first = true;
+						for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
+							if (first) {
+								first = false;
+							} else {
+								wrt.write(",");
+							}
+
+							wrt.write("{");
+							wrt.write("\"vertexid\": \"" + ev.getCurrentExecutionAttempt().getAttemptId()
+									+ "\",");
+							wrt.write("\"newstate\": \"" + ev.getExecutionState() + "\",");
+							wrt.write("\"timestamp\": \"" + ev.getStateTimestamp(ev.getExecutionState())
+									+ "\"");
+							wrt.write("}");
+						}
+
+						wrt.write("],");
+
+						wrt.write("\"jobevents\": [");
+
+						wrt.write("{");
+						wrt.write("\"newstate\": \"" + graph.getState() + "\",");
+						wrt.write("\"timestamp\": \"" + graph.getStatusTimestamp(graph.getState()) + "\"");
+						wrt.write("}");
+
+						wrt.write("]");
+
+						wrt.write("}");
 			} else {
-				wrt.write("\"vertexevents\": [],");
-				wrt.write("\"jobevents\": [");
-				wrt.write("{");
-				wrt.write("\"newstate\": \"" + JobStatus.FINISHED + "\",");
-				wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\"");
-				wrt.write("}");
-				wrt.write("]");
-				wrt.write("}");
-				LOG.warn("WriteJsonUpdatesForJob: Could not find job with job ID " + jobId);
+						wrt.write("\"vertexevents\": [],");
+						wrt.write("\"jobevents\": [");
+						wrt.write("{");
+						wrt.write("\"newstate\": \"" + JobStatus.FINISHED + "\",");
+						wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\"");
+						wrt.write("}");
+						wrt.write("]");
+						wrt.write("}");
+						LOG.warn("WriteJsonUpdatesForJob: Could not find job with job ID " + jobId);
+					}
+				}
 			}
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client	
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
+			
+		} catch (Exception exception) { // Connection closed by client
+			LOG.info("Info server for jobmanager: Failed to write json updates for job {}, " +
+					"because {}.", jobId, StringUtils.stringifyException(exception));
 		} 
 		
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 026758d..92e0a93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -33,11 +33,12 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.Instance;
 
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestRegisteredTaskManagers$;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
@@ -45,6 +46,8 @@ import org.codehaus.jettison.json.JSONObject;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -104,44 +107,63 @@ public class SetupInfoServlet extends HttpServlet {
 	
 	private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
 
-		List<Instance> instances = new ArrayList<Instance>(AkkaUtils.<RegisteredTaskManagers>ask
-				(jobmanager, RequestRegisteredTaskManagers$.MODULE$, timeout).asJavaCollection());
+		final Future<Object> response = Patterns.ask(jobmanager,
+				JobManagerMessages.getRequestRegisteredTaskManagers(),
+				new Timeout(timeout));
+
+		Object obj = null;
+
+		try{
+			obj = Await.result(response, timeout);
+		} catch (Exception ex) {
+			throw new IOException("Could not retrieve all registered task managers from the " +
+					"job manager.", ex);
+		}
+
+		if(!(obj instanceof RegisteredTaskManagers)){
+			throw new RuntimeException("RequestRegisteredTaskManagers should return a response of " +
+					"type RegisteredTaskManagers. Instead the respone is of type " +
+					obj.getClass() + ".");
+		} else {
+
+			final List<Instance> instances = new ArrayList<Instance>(
+					((RegisteredTaskManagers) obj).asJavaCollection());
+
+			Collections.sort(instances, INSTANCE_SORTER);
+
+			JSONObject jsonObj = new JSONObject();
+			JSONArray array = new JSONArray();
+			for (Instance instance : instances) {
+				JSONObject objInner = new JSONObject();
+
+				long time = new Date().getTime() - instance.getLastHeartBeat();
+
+				try {
+					objInner.put("inetAdress", instance.getInstanceConnectionInfo().getInetAdress());
+					objInner.put("ipcPort", instance.getTaskManager().path().address().hostPort());
+					objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
+					objInner.put("timeSinceLastHeartbeat", time / 1000);
+					objInner.put("slotsNumber", instance.getTotalNumberOfSlots());
+					objInner.put("freeSlots", instance.getNumberOfAvailableSlots());
+					objInner.put("cpuCores", instance.getResources().getNumberOfCPUCores());
+					objInner.put("physicalMemory", instance.getResources().getSizeOfPhysicalMemory() >>> 20);
+					objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
+					objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
+					array.put(objInner);
+				} catch (JSONException e) {
+					LOG.warn("Json object creation failed", e);
+				}
 
-		Collections.sort(instances, INSTANCE_SORTER);
-				
-		JSONObject obj = new JSONObject();
-		JSONArray array = new JSONArray();
-		for (Instance instance : instances) {
-			JSONObject objInner = new JSONObject();
-				
-			long time = new Date().getTime() - instance.getLastHeartBeat();
-	
-			try {
-				objInner.put("inetAdress", instance.getInstanceConnectionInfo().getInetAdress());
-				objInner.put("ipcPort", instance.getTaskManager().path().address().hostPort());
-				objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
-				objInner.put("timeSinceLastHeartbeat", time / 1000);
-				objInner.put("slotsNumber", instance.getTotalNumberOfSlots());
-				objInner.put("freeSlots", instance.getNumberOfAvailableSlots());
-				objInner.put("cpuCores", instance.getResources().getNumberOfCPUCores());
-				objInner.put("physicalMemory", instance.getResources().getSizeOfPhysicalMemory() >>> 20);
-				objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
-				objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
-				array.put(objInner);
 			}
-			catch (JSONException e) {
+			try {
+				jsonObj.put("taskmanagers", array);
+			} catch (JSONException e) {
 				LOG.warn("Json object creation failed", e);
 			}
-			
-		}
-		try {
-			obj.put("taskmanagers", array);
-		} catch (JSONException e) {
-			LOG.warn("Json object creation failed", e);
+
+			PrintWriter w = resp.getWriter();
+			w.write(jsonObj.toString());
 		}
-		
-		PrintWriter w = resp.getWriter();
-		w.write(obj.toString());
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index fdd2636..8e079b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -20,8 +20,9 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -30,6 +31,8 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.util.InstantiationUtil;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 public class TaskInputSplitProvider implements InputSplitProvider {
@@ -61,14 +64,25 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	@Override
 	public InputSplit getNextInputSplit() {
 		try {
-			TaskManagerMessages.NextInputSplit nextInputSplit = AkkaUtils.ask(jobManager,
-					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID), timeout);
+			final Future<Object> response = Patterns.ask(jobManager,
+					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID),
+					new Timeout(timeout));
 
-			byte[] serializedData = nextInputSplit.splitData();
-			Object deserialized = InstantiationUtil.deserializeObject(serializedData, usercodeClassLoader);
-			return (InputSplit) deserialized;
-		}
-		catch (Exception e) {
+			final Object result = Await.result(response, timeout);
+
+			if(!(result instanceof TaskManagerMessages.NextInputSplit)){
+				throw new RuntimeException("RequestNextInputSplit requires a response of type " +
+						"NextInputSplit. Instead response is of type " + result.getClass() + ".");
+			} else {
+				final TaskManagerMessages.NextInputSplit nextInputSplit =
+						(TaskManagerMessages.NextInputSplit) result;
+
+				byte[] serializedData = nextInputSplit.splitData();
+				Object deserialized = InstantiationUtil.deserializeObject(serializedData,
+						usercodeClassLoader);
+				return (InputSplit) deserialized;
+			}
+		} catch (Exception e) {
 			throw new RuntimeException("Requesting the next InputSplit failed.", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index ec5f7fa..c5ea980 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -280,26 +280,13 @@ object AkkaUtils {
   }
 
   def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout:
-  FiniteDuration): ActorRef = {
-    Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout)
+  FiniteDuration): Future[ActorRef] = {
+    system.actorSelection(parent.path / child).resolveOne()(timeout)
   }
 
-  def getReference(path: String)(implicit system: ActorSystem, timeout: FiniteDuration): ActorRef
-  = {
-    Await.result(system.actorSelection(path).resolveOne()(timeout), timeout)
-  }
-
-  @throws(classOf[IOException])
-  def ask[T](actorSelection: ActorSelection, msg: Any)(implicit timeout: FiniteDuration): T
-    = {
-    val future = Patterns.ask(actorSelection, msg, timeout)
-    Await.result(future, timeout).asInstanceOf[T]
-  }
-
-  @throws(classOf[IOException])
-  def ask[T](actor: ActorRef, msg: Any)(implicit timeout: FiniteDuration): T = {
-    val future = Patterns.ask(actor, msg, timeout)
-    Await.result(future, timeout).asInstanceOf[T]
+  def getReference(path: String)(implicit system: ActorSystem, timeout: FiniteDuration):
+  Future[ActorRef] = {
+    system.actorSelection(path).resolveOne()(timeout)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 676ddda..eec1012 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -23,7 +23,7 @@ import java.net.InetSocketAddress
 
 import akka.actor.Status.Failure
 import akka.actor._
-import akka.pattern.ask
+import akka.pattern.{Patterns, ask}
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.ActorLogMessages
@@ -35,19 +35,16 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 
 import scala.concurrent.{TimeoutException, Await}
 import scala.concurrent.duration.FiniteDuration
+import scala.util.Success
 
 /**
  * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient
  * is used to submit jobs to the JobManager and to request the port of the BlobManager.
  *
- * @param jobManagerURL Akka URL of the JobManager
- * @param timeout Timeout used for futures
+ * @param jobManager ActorRef to jobmanager
  */
-class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends
+class JobClient(jobManager: ActorRef) extends
 Actor with ActorLogMessages with ActorLogging {
-  import context._
-
-  val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout)
 
   override def receiveWithLogMessages: Receive = {
     case SubmitJobDetached(jobGraph) =>
@@ -111,7 +108,16 @@ object JobClient{
 
   def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem, timeout: FiniteDuration):
   ActorRef = {
-    actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL, timeout), JOB_CLIENT_NAME)
+    val jobManagerFuture = AkkaUtils.getReference(jobManagerURL)(actorSystem, timeout)
+
+    val jobManager = try {
+      Await.result(jobManagerFuture, timeout)
+    } catch {
+      case ex: Exception =>
+        throw new RuntimeException("Could not connect to JobManager at " + jobManagerURL + ".")
+    }
+
+    actorSystem.actorOf(Props(classOf[JobClient], jobManager), JOB_CLIENT_NAME)
   }
 
   def startActorWithConfiguration(config: Configuration, localActorSystem: Boolean)
@@ -215,19 +221,27 @@ object JobClient{
 
   /**
    * Uploads the specified jar files of the [[JobGraph]] jobGraph to the BlobServer of the
-   * JobManager. The respective port is retrieved from the JobManager.
+   * JobManager. The respective port is retrieved from the JobManager. This function issues a
+   * blocking call.
    *
    * @param jobGraph Flink job containing the information about the required jars
    * @param hostname Hostname of the instance on which the BlobServer and also the JobManager run
    * @param jobClient ActorRef to the JobClient
    * @param timeout Timeout for futures
-   * @throws java.io.IOException
+   * @throws IOException
    * @return
    */
   @throws(classOf[IOException])
   def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef)(implicit timeout:
    FiniteDuration): Unit = {
-    val port = AkkaUtils.ask[Int](jobClient, RequestBlobManagerPort)
+
+    val futureBlobPort = Patterns.ask(jobClient, RequestBlobManagerPort, timeout).mapTo[Int]
+
+    val port = try {
+      Await.result(futureBlobPort, timeout)
+    } catch {
+      case e:Exception => throw new IOException("Could not retrieve the server's blob port.", e)
+    }
 
     val serverAddress = new InetSocketAddress(hostname, port)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7cde977..7a67106 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -143,12 +143,13 @@ class JobManager(val configuration: Configuration,
       // TaskManager is already registered
       if(instanceID == null){
         val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
-        taskManager ! AlreadyRegistered(instanceID, libraryCacheManager.getBlobServerPort)
+        taskManager ! AlreadyRegistered(instanceID, libraryCacheManager.getBlobServerPort, profiler)
       } else {
         // to be notified when the taskManager is no longer reachable
         context.watch(taskManager)
 
-        taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
+        taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort,
+          profiler)
       }
 
 
@@ -796,28 +797,17 @@ object JobManager {
     s"akka.tcp://flink@$address/user/$JOB_MANAGER_NAME"
   }
 
-  def getLocalAkkaURL: String = {
-    s"akka://flink/user/$JOB_MANAGER_NAME"
-  }
-
-  def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration):
-  ActorRef = {
-    AkkaUtils.getChild(jobManager, PROFILER_NAME)
+  def getRemoteAkkaURL(address : InetSocketAddress): String = {
+    getRemoteAkkaURL(address.getHostName + ":" + address.getPort)
   }
 
-  def getEventCollector(jobManager: ActorRef)(implicit system: ActorSystem, timeout:
-  FiniteDuration): ActorRef = {
-    AkkaUtils.getChild(jobManager, EVENT_COLLECTOR_NAME)
-  }
-
-  def getArchivist(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration):
-  ActorRef = {
-    AkkaUtils.getChild(jobManager, ARCHIVE_NAME)
+  def getLocalAkkaURL: String = {
+    s"akka://flink/user/$JOB_MANAGER_NAME"
   }
 
   def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, timeout:
-  FiniteDuration): ActorRef = {
-    AkkaUtils.getReference(getRemoteAkkaURL(address.getHostName + ":" + address.getPort))
+  FiniteDuration): Future[ActorRef] = {
+    AkkaUtils.getReference(getRemoteAkkaURL(address))
   }
 
   private def checkJavaVersion(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index 8d30741..1a3479a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.messages
 
+import akka.actor.ActorRef
 import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
 
 object RegistrationMessages {
@@ -40,16 +41,20 @@ object RegistrationMessages {
    *
    * @param instanceID
    * @param blobPort
+   * @param profilerListener
    */
-  case class AcknowledgeRegistration(instanceID: InstanceID, blobPort: Int)
+  case class AcknowledgeRegistration(instanceID: InstanceID, blobPort: Int,
+                                     profilerListener: Option[ActorRef])
 
   /**
    * Denotes that the TaskManager has already been registered at the JobManager.
    *
    * @param instanceID
    * @param blobPort
+   * @param profilerListener
    */
-  case class AlreadyRegistered(instanceID: InstanceID, blobPort: Int)
+  case class AlreadyRegistered(instanceID: InstanceID, blobPort: Int,
+                                profilerListener: Option[ActorRef])
 
   /**
    * Denotes the unsuccessful registration of a task manager at the job manager. This is the

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index e1cedce..8074386 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -136,6 +136,7 @@ import scala.collection.JavaConverters._
   var registrationAttempts: Int = 0
   var registered: Boolean = false
   var currentJobManager = ActorRef.noSender
+  var profilerListener: Option[ActorRef] = None
   var instanceID: InstanceID = null
   var heartbeatScheduler: Option[Cancellable] = None
 
@@ -218,20 +219,20 @@ import scala.collection.JavaConverters._
         }
       }
 
-    case AcknowledgeRegistration(id, blobPort) =>
+    case AcknowledgeRegistration(id, blobPort, profilerListener) =>
       if(!registered) {
-        finishRegistration(sender, id, blobPort)
+        finishRegistration(sender, id, blobPort, profilerListener)
       } else {
         log.info("The TaskManager {} is already registered at the JobManager {}, but received " +
           "another AcknowledgeRegistration message.", self.path, currentJobManager.path)
       }
 
-    case AlreadyRegistered(id, blobPort) =>
+    case AlreadyRegistered(id, blobPort, profilerListener) =>
       if(!registered) {
         log.warning("The TaskManager {} seems to be already registered at the JobManager {} even" +
           "though it has not yet finished the registration process.", self.path, sender.path)
 
-        finishRegistration(sender, id, blobPort)
+        finishRegistration(sender, id, blobPort, profilerListener)
       } else {
         // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration
         log.info("The TaskManager {} has already been registered at the JobManager {}.",
@@ -461,10 +462,14 @@ import scala.collection.JavaConverters._
 
     heartbeatScheduler = None
 
-    profiler foreach {
-      _.tell(UnregisterProfilingListener, JobManager.getProfiler(currentJobManager))
+    profilerListener foreach {
+      listener =>
+        profiler foreach {
+          _.tell(UnregisterProfilingListener, listener)
+        }
     }
 
+    profilerListener = None
     currentJobManager = ActorRef.noSender
     instanceID = null
     registered = false
@@ -512,8 +517,9 @@ import scala.collection.JavaConverters._
     }
   }
 
-  private def finishRegistration(jobManager: ActorRef, id: InstanceID, blobPort: Int): Unit = {
-    setupTaskManager(jobManager, id, blobPort)
+  private def finishRegistration(jobManager: ActorRef, id: InstanceID, blobPort: Int,
+                                  profilerListener: Option[ActorRef]): Unit = {
+    setupTaskManager(jobManager, id, blobPort, profilerListener)
 
     for (listener <- waitForRegistration) {
       listener ! RegisteredAtJobManager
@@ -522,9 +528,11 @@ import scala.collection.JavaConverters._
     waitForRegistration.clear()
   }
 
-  private def setupTaskManager(jobManager: ActorRef, id: InstanceID, blobPort: Int): Unit = {
+  private def setupTaskManager(jobManager: ActorRef, id: InstanceID, blobPort: Int,
+                                profilerListener: Option[ActorRef]): Unit = {
     registered = true
     currentJobManager = jobManager
+    this.profilerListener = profilerListener
     instanceID = id
 
     // watch job manager to detect when it dies
@@ -537,8 +545,11 @@ import scala.collection.JavaConverters._
     heartbeatScheduler = Some(context.system.scheduler.schedule(
       TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
 
-    profiler foreach {
-      _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
+    profilerListener foreach {
+      listener =>
+        profiler foreach {
+          _.tell(RegisterProfilingListener, listener)
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 3ad4238..2ae43f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import static org.junit.Assert.*;
 
+import java.net.InetAddress;
 import java.net.ServerSocket;
 
 import org.apache.flink.configuration.ConfigConstants;
@@ -40,7 +41,7 @@ public class JobManagerStartupTest {
 		
 		try {
 			portNum = NetUtils.getAvailablePort();
-			portOccupier = new ServerSocket(portNum);
+			portOccupier = new ServerSocket(portNum, 10, InetAddress.getByName("127.0.0.1"));
 		}
 		catch (Throwable t) {
 			// could not find free port, or open a connection there
@@ -48,13 +49,16 @@ public class JobManagerStartupTest {
 		}
 		
 		try {
-			Tuple2<String, Object> connection = new Tuple2<String, Object>("localhost", portNum);
+			Tuple2<String, Object> connection = new Tuple2<String, Object>("\"127.0.0.1\"", portNum);
 			JobManager.runJobManager(new Configuration(), ExecutionMode.CLUSTER(), new Some<Tuple2<String, Object>>(connection));
 			fail("this should throw an exception");
 		}
 		catch (Exception e) {
 			// expected
-			assertTrue(e.getMessage().contains("Address already in use"));
+			if(!e.getMessage().contains("Address already in use")) {
+				e.printStackTrace();
+				fail("Received wrong exception");
+			}
 		}
 		finally {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 28076b5..a83886c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -55,6 +55,9 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import scala.None;
+import scala.None$;
+import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -486,7 +489,8 @@ public class TaskManagerTest {
 		public void onReceive(Object message) throws Exception {
 			if(message instanceof RegistrationMessages.RegisterTaskManager){
 				final InstanceID iid = new InstanceID();
-				getSender().tell(new RegistrationMessages.AcknowledgeRegistration(iid, -1),
+				getSender().tell(new RegistrationMessages.AcknowledgeRegistration(iid, -1,
+								Option.<ActorRef>apply(null)),
 						getSelf());
 			}else if(message instanceof JobManagerMessages.UpdateTaskExecutionState){
 				getSender().tell(true, getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 4d8bea6..d2fe4c2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.jobmanager
 
 import Tasks._
 import akka.actor.ActorSystem
+import akka.pattern.ask
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.akka.AkkaUtils
+import akka.util.Timeout
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph, ScheduleMode}
-import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.TestingUtils
@@ -32,6 +32,7 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 import scheduler.{NoResourceAvailableException, SlotSharingGroup}
 
+import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.Random
@@ -39,7 +40,8 @@ import scala.util.Random
 @RunWith(classOf[JUnitRunner])
 class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with
 WordSpecLike with Matchers with BeforeAndAfterAll {
-  implicit val timeout = 1 minute
+  implicit val duration = 1 minute
+  implicit val timeout = Timeout.durationToTimeout(duration)
 
   def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
 
@@ -59,7 +61,10 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       val jm = cluster.getJobManager
 
       try {
-        val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
+        val response = (jm ? RequestTotalNumberOfSlots).mapTo[Int]
+
+        val availableSlots = Await.result(response, duration)
+
         availableSlots should equal(1)
 
         within(1 second) {
@@ -89,7 +94,10 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       val jm = cluster.getJobManager
 
       try {
-        val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
+        val response = (jm ? RequestTotalNumberOfSlots).mapTo[Int]
+
+        val availableSlots = Await.result(response, duration)
+
         availableSlots should equal(num_tasks)
 
         within(TestingUtils.TESTING_DURATION) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
index ec41141..ccd326f 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
@@ -92,7 +92,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION) {
           expectMsgType[RegisterTaskManager]
 
-          tm ! AcknowledgeRegistration(new InstanceID(), 42)
+          tm ! AcknowledgeRegistration(new InstanceID(), 42, None)
 
           tm ! RefuseRegistration("Should be ignored")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 1794d36..abfd4a9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -23,9 +23,9 @@ import akka.actor.ActorSystem;
 import static akka.pattern.Patterns.ask;
 
 import akka.actor.Props;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.AkkaUtils$;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.None$;
+import scala.Option;
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -220,19 +221,34 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		// get messages from ApplicationClient (locally)
 
 		while(true) {
-			Object messageOption = null;
+			Object result = null;
 			try {
-				messageOption = AkkaUtils$.MODULE$.ask(applicationClient, Messages.LocalGetYarnMessage$.MODULE$, akkaDuration);
-			} catch(IOException ioe) {
-				LOG.warn("Error getting the yarn messages locally", ioe);
+				Future<Object> response = Patterns.ask(applicationClient,
+						Messages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
+
+				result = Await.result(response, akkaDuration);
+			} catch(Exception ioe) {
+				LOG.warn("Error retrieving the yarn messages locally", ioe);
 			}
-			if(messageOption instanceof None$) {
-				break;
-			} else if(messageOption instanceof org.apache.flink.yarn.Messages.YarnMessage) {
-				Messages.YarnMessage msg = (Messages.YarnMessage) messageOption;
-				ret.add("["+msg.date()+"] "+msg.message());
+
+			if(!(result instanceof Option)) {
+				throw new RuntimeException("LocalGetYarnMessage requires a response of type " +
+						"Option. Instead the response is of type " + result.getClass() + ".");
 			} else {
-				LOG.warn("LocalGetYarnMessage returned unexpected type: "+messageOption);
+				Option messageOption = (Option) result;
+
+				if(messageOption.isEmpty()) {
+					break;
+				} else {
+					Object obj = messageOption.get();
+
+					if(obj instanceof Messages.YarnMessage) {
+						Messages.YarnMessage msg = (Messages.YarnMessage) obj;
+						ret.add("["+msg.date()+"] "+msg.message());
+					} else {
+						LOG.warn("LocalGetYarnMessage returned unexpected type: "+messageOption);
+					}
+				}
 			}
 		}
 		return ret;
@@ -258,8 +274,12 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 			LOG.info("Sending shutdown request to the Application Master");
 			if(applicationClient != ActorRef.noSender()) {
 				try {
-					AkkaUtils$.MODULE$.ask(applicationClient, new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED), akkaDuration);
-				} catch(IOException e) {
+					Future<Object> response = Patterns.ask(applicationClient,
+							new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED),
+							new Timeout(akkaDuration));
+
+					Await.ready(response, akkaDuration);
+				} catch(Exception e) {
 					throw new RuntimeException("Error while stopping YARN Application Client", e);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 390835c..3204281 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -18,10 +18,8 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.TimeUnit
-
 import akka.actor._
-import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
+import org.apache.flink.configuration.GlobalConfiguration
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.JobManager
@@ -31,6 +29,7 @@ import scala.collection.mutable
 import scala.concurrent.duration._
 
 import scala.language.postfixOps
+import scala.util.{Failure, Success}
 
 class ApplicationClient extends Actor with ActorLogMessages with ActorLogging {
   import context._
@@ -67,22 +66,29 @@ class ApplicationClient extends Actor with ActorLogMessages with ActorLogging {
     case LocalRegisterClient(address: String) =>
       val jmAkkaUrl = JobManager.getRemoteAkkaURL(address)
 
-      yarnJobManager = Some(AkkaUtils.getReference(jmAkkaUrl)(system, timeout))
-      yarnJobManager match {
-        case Some(jm) =>
-          // the message came from the FlinkYarnCluster. We send the message to the JobManager.
-          // it is important not to forward the message because the JobManager is storing the
-          // sender as the Application Client (this class).
-          jm ! RegisterClient
-
-          // schedule a periodic status report from the JobManager
-          // request the number of task managers and slots from the job manager
-          pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY,
-            WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, PollYarnClusterStatus))
-        case None => throw new RuntimeException("Registration at JobManager/ApplicationMaster " +
-          "failed. Job Manager RPC connection has not properly been initialized")
+      val jobManagerFuture = AkkaUtils.getReference(jmAkkaUrl)(system, timeout)
+
+      jobManagerFuture.onComplete {
+        case Success(jm) => self ! JobManagerActorRef(jm)
+        case Failure(t) =>
+          log.error(t, "Registration at JobManager/ApplicationMaster failed. Shutting " +
+            "ApplicationClient down.")
+          self ! PoisonPill
       }
 
+    case JobManagerActorRef(jm) =>
+      yarnJobManager = Some(jm)
+
+      // the message came from the FlinkYarnCluster. We send the message to the JobManager.
+      // it is important not to forward the message because the JobManager is storing the
+      // sender as the Application Client (this class).
+      jm ! RegisterClient
+
+      // schedule a periodic status report from the JobManager
+      // request the number of task managers and slots from the job manager
+      pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY,
+        WAIT_FOR_YARN_INTERVAL, jm, PollYarnClusterStatus))
+
     case msg: StopYarnSession =>
       log.info("Stop yarn session.")
       stopMessageReceiver = Some(sender())
@@ -117,9 +123,7 @@ class ApplicationClient extends Actor with ActorLogMessages with ActorLogging {
 
     // locally forward messages
     case LocalGetYarnMessage =>
-      sender() ! (if( messagesQueue.size == 0) None else messagesQueue.dequeue)
-
-    case _ =>
+      sender() ! messagesQueue.headOption
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1583dcfc/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
index 47ce782..5cdbbff 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
@@ -20,6 +20,7 @@ package org.apache.flink.yarn
 
 import java.util.Date
 
+import akka.actor.ActorRef
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 
@@ -32,6 +33,8 @@ object Messages {
   case object JobManagerStopped
   case class StartYarnSession(configuration: Configuration, actorSystemPort: Int)
 
+  case class JobManagerActorRef(jobManager: ActorRef)
+
   case object PollContainerCompletion
   case object PollYarnClusterStatus // see org.apache.flink.runtime.yarn.FlinkYarnClusterStatus for
                                     // the response
@@ -41,4 +44,8 @@ object Messages {
   case class LocalRegisterClient(jobManagerAddress: String)
   case object LocalGetYarnMessage // request new message
   case object LocalGetYarnClusterStatus // request the latest cluster status
+
+  def getLocalGetYarnMessage(): AnyRef = {
+    LocalGetYarnMessage
+  }
 }


Mime
View raw message