flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [6/7] flink git commit: [FLINK-1376] [runtime] Add proper shared slot release in case of a fatal TaskManager failure.
Date Thu, 05 Feb 2015 12:25:47 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
deleted file mode 100644
index 92d457b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-
-public class SubSlot extends AllocatedSlot {
-
-	private static final long serialVersionUID = 1361615219044538497L;
-	
-
-	private final SharedSlot sharedSlot;
-	
-	private final AbstractID groupId;
-	
-	private final int subSlotNumber;
-	
-	
-	public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID groupId) {
-		super(sharedSlot.getAllocatedSlot().getJobID(),
-				sharedSlot.getAllocatedSlot().getInstance(),
-				sharedSlot.getAllocatedSlot().getSlotNumber());
-		
-		this.sharedSlot = sharedSlot;
-		this.groupId = groupId;
-		this.subSlotNumber = subSlotNumber;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public void releaseSlot() {
-		// cancel everything, if there is something. since this is atomically status based,
-		// it will not happen twice if another attempt happened before or concurrently
-		try {
-			cancel();
-		}
-		finally {
-			if (markReleased()) {
-				this.sharedSlot.returnAllocatedSlot(this);
-			}
-		}
-	}
-	
-	public SharedSlot getSharedSlot() {
-		return this.sharedSlot;
-	}
-	
-	public AbstractID getGroupId() {
-		return groupId;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "SubSlot " + subSlotNumber + " (" + super.toString() + ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
new file mode 100644
index 0000000..1492ae1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.web;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
+import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$;
+import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse;
+import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobResponse;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.StringUtils;
+import org.eclipse.jetty.io.EofException;
+import scala.concurrent.duration.FiniteDuration;
+
+public class JobManagerInfoServlet extends HttpServlet {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class);
+	
+	/** Underlying JobManager */
+	private final ActorRef jobmanager;
+	private final ActorRef archive;
+	private final FiniteDuration timeout;
+	
+	
+	public JobManagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
+		this.jobmanager = jobmanager;
+		this.archive = archive;
+		this.timeout = timeout;
+	}
+	
+	
+	@Override
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+			
+		resp.setStatus(HttpServletResponse.SC_OK);
+		resp.setContentType("application/json");
+		
+		try {
+			if("archive".equals(req.getParameter("get"))) {
+				List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils
+						.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
+						.asJavaCollection());
+
+				writeJsonForArchive(resp.getWriter(), archivedJobs);
+			}
+			else if("job".equals(req.getParameter("get"))) {
+				String jobId = req.getParameter("job");
+				JobResponse response = AkkaUtils.ask(archive,
+						new RequestJob(JobID.fromHexString(jobId)), timeout);
+
+				if(response instanceof JobFound){
+					ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
+					writeJsonForArchivedJob(resp.getWriter(), archivedJob);
+				}else{
+					LOG.warn("DoGet:job: Could not find job for job ID " + jobId);
+				}
+			}
+			else if("groupvertex".equals(req.getParameter("get"))) {
+				String jobId = req.getParameter("job");
+				String groupvertexId = req.getParameter("groupvertex");
+
+				JobResponse response = AkkaUtils.ask(archive,
+						new RequestJob(JobID.fromHexString(jobId)), timeout);
+
+				if(response instanceof JobFound && groupvertexId != null){
+					ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
+
+					writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob,
+							JobVertexID.fromHexString(groupvertexId));
+				}else{
+					LOG.warn("DoGet:groupvertex: Could not find job for job ID " + jobId);
+				}
+			}
+			else if("taskmanagers".equals(req.getParameter("get"))) {
+				int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
+						RequestNumberRegisteredTaskManager$.MODULE$, timeout);
+				int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
+						RequestTotalNumberOfSlots$.MODULE$, timeout);
+
+				resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
+						"\"slots\": "+numberOfRegisteredSlots+"}");
+			}
+			else if("cancel".equals(req.getParameter("get"))) {
+				String jobId = req.getParameter("job");
+				AkkaUtils.<CancellationResponse>ask(jobmanager,
+						new CancelJob(JobID.fromHexString(jobId)), timeout);
+			}
+			else if("updates".equals(req.getParameter("get"))) {
+				String jobId = req.getParameter("job");
+				writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId));
+			} else if ("version".equals(req.getParameter("get"))) {
+				writeJsonForVersion(resp.getWriter());
+			}
+			else{
+				Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask
+						(jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
+				writeJsonForJobs(resp.getWriter(), runningJobs);
+			}
+			
+		} catch (Exception e) {
+			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+			resp.getWriter().print(e.getMessage());
+			if (LOG.isWarnEnabled()) {
+				LOG.warn(StringUtils.stringifyException(e));
+			}
+		}
+	}
+	
+	/**
+	 * Writes ManagementGraph as Json for all recent jobs
+	 * 
+	 * @param wrt
+	 * @param graphs
+	 */
+	private void writeJsonForJobs(PrintWriter wrt, Iterable<ExecutionGraph> graphs) {
+		try {
+			wrt.write("[");
+
+			Iterator<ExecutionGraph> it = graphs.iterator();
+			// Loop Jobs
+			while(it.hasNext()){
+				ExecutionGraph graph = it.next();
+	
+				writeJsonForJob(wrt, graph);
+	
+				//Write seperator between json objects
+				if(it.hasNext()) {
+					wrt.write(",");
+				}
+			}
+			wrt.write("]");
+		
+		} catch (EofException eof) { // Connection closed by client
+			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
+		} catch (IOException ioe) { // Connection closed by client	
+			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
+		} 
+	}
+	
+	private void writeJsonForJob(PrintWriter wrt, ExecutionGraph graph) throws IOException {
+		//Serialize job to json
+		wrt.write("{");
+		wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
+		wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
+		wrt.write("\"status\": \""+ graph.getState() + "\",");
+		wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())+",");
+		
+		// Serialize ManagementGraph to json
+		wrt.write("\"groupvertices\": [");
+		boolean first = true;
+		
+		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+			//Write seperator between json objects
+			if(first) {
+				first = false;
+			} else {
+				wrt.write(","); }
+			
+			wrt.write(JsonFactory.toJson(groupVertex));
+		}
+		wrt.write("]");
+		wrt.write("}");
+			
+	}
+	
+	/**
+	 * Writes Json with a list of currently archived jobs, sorted by time
+	 * 
+	 * @param wrt
+	 * @param graphs
+	 */
+	private void writeJsonForArchive(PrintWriter wrt, List<ExecutionGraph> graphs) {
+		
+		wrt.write("[");
+		
+		// sort jobs by time
+		Collections.sort(graphs,  new Comparator<ExecutionGraph>() {
+			@Override
+			public int compare(ExecutionGraph o1, ExecutionGraph o2) {
+				if(o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) {
+					return 1;
+				} else {
+					return -1;
+				}
+			}
+			
+		});
+		
+		// Loop Jobs
+		for (int i = 0; i < graphs.size(); i++) {
+			ExecutionGraph graph = graphs.get(i);
+			
+			//Serialize job to json
+			wrt.write("{");
+			wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
+			wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
+			wrt.write("\"status\": \""+ graph.getState() + "\",");
+			wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState()));
+			
+			wrt.write("}");
+			
+			//Write seperator between json objects
+			if(i != graphs.size() - 1) {
+				wrt.write(",");
+			}
+		}
+		wrt.write("]");
+		
+	}
+	
+	/**
+	 * Writes infos about archived job in Json format, including groupvertices and groupverticetimes
+	 * 
+	 * @param wrt
+	 * @param graph
+	 */
+	private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph graph) {
+		
+		try {
+		
+			wrt.write("[");
+		
+			//Serialize job to json
+			wrt.write("{");
+			wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
+			wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
+			wrt.write("\"status\": \""+ graph.getState() + "\",");
+			wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ",");
+			wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
+			wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
+			wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ",");
+			wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
+
+			if (graph.getState() == JobStatus.FAILED) {
+				wrt.write("\"failednodes\": [");
+				boolean first = true;
+				for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
+					if (vertex.getExecutionState() == ExecutionState.FAILED) {
+						SimpleSlot slot = vertex.getCurrentAssignedResource();
+						Throwable failureCause = vertex.getFailureCause();
+						if (slot != null || failureCause != null) {
+							if (first) {
+								first = false;
+							} else {
+								wrt.write(",");
+							}
+							wrt.write("{");
+							wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot
+									.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
+							wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\"");
+							wrt.write("}");
+						}
+					}
+				}
+				wrt.write("],");
+			}
+
+			// Serialize ManagementGraph to json
+			wrt.write("\"groupvertices\": [");
+			boolean first = true;
+			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+				//Write seperator between json objects
+				if(first) {
+					first = false;
+				} else {
+					wrt.write(","); }
+				
+				wrt.write(JsonFactory.toJson(groupVertex));
+				
+			}
+			wrt.write("],");
+			
+			// write accumulators
+			AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager,
+					new RequestAccumulatorResults(graph.getJobID()), timeout);
+
+			if(response instanceof AccumulatorResultsFound){
+				Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap();
+
+				wrt.write("\n\"accumulators\": [");
+				int i = 0;
+				for( Entry<String, Object> accumulator : accMap.entrySet()) {
+					wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\","
+							+ " \"value\": \""+accumulator.getValue().toString()+"\"}\n");
+					if(++i < accMap.size()) {
+						wrt.write(",");
+					}
+				}
+				wrt.write("],\n");
+
+				wrt.write("\"groupverticetimes\": {");
+				first = true;
+				for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+
+					if(first) {
+						first = false;
+					} else {
+						wrt.write(","); }
+
+					// Calculate start and end time for groupvertex
+					long started = Long.MAX_VALUE;
+					long ended = 0;
+
+					// Take earliest running state and latest endstate of groupmembers
+					for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+
+						long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
+						if (running != 0 && running < started) {
+							started = running;
+						}
+
+						long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
+						long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
+						long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
+
+						if(finished != 0 && finished > ended) {
+							ended = finished;
+						}
+
+						if(canceled != 0 && canceled > ended) {
+							ended = canceled;
+						}
+
+						if(failed != 0 && failed > ended) {
+							ended = failed;
+						}
+
+					}
+
+					wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
+					wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
+					wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
+					wrt.write("\"STARTED\": "+ started + ",");
+					wrt.write("\"ENDED\": "+ ended);
+					wrt.write("}");
+
+				}
+			}else{
+				LOG.warn("Could not find accumulator results for job ID " + graph.getJobID());
+			}
+
+			wrt.write("}");
+			
+			wrt.write("}");
+			
+			
+		wrt.write("]");
+		
+		} catch (EofException eof) { // Connection closed by client
+			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
+		} catch (IOException ioe) { // Connection closed by client	
+			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
+		} 
+		
+	}
+	
+	
+	/**
+	 * Writes all updates (events) for a given job since a given time
+	 * 
+	 * @param wrt
+	 * @param jobId
+	 */
+	private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
+		
+		try {
+			Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager,
+					RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
+			
+			//Serialize job to json
+			wrt.write("{");
+			wrt.write("\"jobid\": \"" + jobId + "\",");
+			wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\",");
+			wrt.write("\"recentjobs\": [");
+
+			boolean first = true;
+
+			for(ExecutionGraph g : graphs){
+				if(first){
+					first = false;
+				}else{
+					wrt.write(",");
+				}
+
+				wrt.write("\"" + g.getJobID() + "\"");
+			}
+
+			wrt.write("],");
+
+			JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId), timeout);
+
+			if(response instanceof JobFound){
+				ExecutionGraph graph = ((JobFound)response).executionGraph();
+
+				wrt.write("\"vertexevents\": [");
+
+				first = true;
+				for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
+					if (first) {
+						first = false;
+					} else {
+						wrt.write(",");
+					}
+
+					wrt.write("{");
+					wrt.write("\"vertexid\": \"" + ev.getCurrentExecutionAttempt().getAttemptId()
+							+ "\",");
+					wrt.write("\"newstate\": \"" + ev.getExecutionState() + "\",");
+					wrt.write("\"timestamp\": \"" + ev.getStateTimestamp(ev.getExecutionState())
+							+ "\"");
+					wrt.write("}");
+				}
+
+				wrt.write("],");
+
+				wrt.write("\"jobevents\": [");
+
+				wrt.write("{");
+				wrt.write("\"newstate\": \"" + graph.getState() + "\",");
+				wrt.write("\"timestamp\": \"" + graph.getStatusTimestamp(graph.getState()) + "\"");
+				wrt.write("}");
+
+				wrt.write("]");
+
+				wrt.write("}");
+			}else{
+				wrt.write("\"vertexevents\": [],");
+				wrt.write("\"jobevents\": [");
+				wrt.write("{");
+				wrt.write("\"newstate\": \"" + JobStatus.FINISHED + "\",");
+				wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\"");
+				wrt.write("}");
+				wrt.write("]");
+				wrt.write("}");
+				LOG.warn("WriteJsonUpdatesForJob: Could not find job with job ID " + jobId);
+			}
+		} catch (EofException eof) { // Connection closed by client
+			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
+		} catch (IOException ioe) { // Connection closed by client	
+			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
+		} 
+		
+	}
+	
+	/**
+	 * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses.
+	 */
+	private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, ExecutionGraph graph,
+													JobVertexID vertexId) {
+		ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
+
+		// Serialize ManagementGraph to json
+		wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ",");
+
+		wrt.write("\"verticetimes\": {");
+		boolean first = true;
+		for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
+
+			for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+
+				Execution exec = vertex.getCurrentExecutionAttempt();
+
+				if(first) {
+					first = false;
+				} else {
+					wrt.write(","); }
+
+				wrt.write("\""+exec.getAttemptId() +"\": {");
+				wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\",");
+				wrt.write("\"vertexname\": \"" + vertex + "\",");
+				wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
+				wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
+				wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
+				wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
+				wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
+				wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
+				wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
+				wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
+				wrt.write("}");
+			}
+
+		}
+		wrt.write("}}");
+	}
+	
+	/**
+	 * Writes the version and the revision of Flink.
+	 * 
+	 * @param wrt
+	 */
+	private void writeJsonForVersion(PrintWriter wrt) {
+		wrt.write("{");
+		wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\",");
+		wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\"");
+		wrt.write("}");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
deleted file mode 100644
index b842a9b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
-import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$;
-import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse;
-import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobResponse;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.StringUtils;
-import org.eclipse.jetty.io.EofException;
-import scala.concurrent.duration.FiniteDuration;
-
-
-public class JobmanagerInfoServlet extends HttpServlet {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(JobmanagerInfoServlet.class);
-	
-	/** Underlying JobManager */
-	private final ActorRef jobmanager;
-	private final ActorRef archive;
-	private final FiniteDuration timeout;
-	
-	
-	public JobmanagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
-		this.jobmanager = jobmanager;
-		this.archive = archive;
-		this.timeout = timeout;
-	}
-	
-	
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-			
-		resp.setStatus(HttpServletResponse.SC_OK);
-		resp.setContentType("application/json");
-		
-		try {
-			if("archive".equals(req.getParameter("get"))) {
-				List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils
-						.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
-						.asJavaCollection());
-
-				writeJsonForArchive(resp.getWriter(), archivedJobs);
-			}
-			else if("job".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				JobResponse response = AkkaUtils.ask(archive,
-						new RequestJob(JobID.fromHexString(jobId)), timeout);
-
-				if(response instanceof JobFound){
-					ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
-					writeJsonForArchivedJob(resp.getWriter(), archivedJob);
-				}else{
-					LOG.warn("DoGet:job: Could not find job for job ID " + jobId);
-				}
-			}
-			else if("groupvertex".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				String groupvertexId = req.getParameter("groupvertex");
-
-				JobResponse response = AkkaUtils.ask(archive,
-						new RequestJob(JobID.fromHexString(jobId)), timeout);
-
-				if(response instanceof JobFound && groupvertexId != null){
-					ExecutionGraph archivedJob = ((JobFound)response).executionGraph();
-
-					writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob,
-							JobVertexID.fromHexString(groupvertexId));
-				}else{
-					LOG.warn("DoGet:groupvertex: Could not find job for job ID " + jobId);
-				}
-			}
-			else if("taskmanagers".equals(req.getParameter("get"))) {
-				int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
-						RequestNumberRegisteredTaskManager$.MODULE$, timeout);
-				int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
-						RequestTotalNumberOfSlots$.MODULE$, timeout);
-
-				resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
-						"\"slots\": "+numberOfRegisteredSlots+"}");
-			}
-			else if("cancel".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				AkkaUtils.<CancellationResponse>ask(jobmanager,
-						new CancelJob(JobID.fromHexString(jobId)), timeout);
-			}
-			else if("updates".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId));
-			} else if ("version".equals(req.getParameter("get"))) {
-				writeJsonForVersion(resp.getWriter());
-			}
-			else{
-				Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask
-						(jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
-				writeJsonForJobs(resp.getWriter(), runningJobs);
-			}
-			
-		} catch (Exception e) {
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			resp.getWriter().print(e.getMessage());
-			if (LOG.isWarnEnabled()) {
-				LOG.warn(StringUtils.stringifyException(e));
-			}
-		}
-	}
-	
-	/**
-	 * Writes ManagementGraph as Json for all recent jobs
-	 * 
-	 * @param wrt
-	 * @param graphs
-	 */
-	private void writeJsonForJobs(PrintWriter wrt, Iterable<ExecutionGraph> graphs) {
-		try {
-			wrt.write("[");
-
-			Iterator<ExecutionGraph> it = graphs.iterator();
-			// Loop Jobs
-			while(it.hasNext()){
-				ExecutionGraph graph = it.next();
-	
-				writeJsonForJob(wrt, graph);
-	
-				//Write seperator between json objects
-				if(it.hasNext()) {
-					wrt.write(",");
-				}
-			}
-			wrt.write("]");
-		
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client	
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
-		} 
-	}
-	
-	private void writeJsonForJob(PrintWriter wrt, ExecutionGraph graph) throws IOException {
-		//Serialize job to json
-		wrt.write("{");
-		wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-		wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-		wrt.write("\"status\": \""+ graph.getState() + "\",");
-		wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())+",");
-		
-		// Serialize ManagementGraph to json
-		wrt.write("\"groupvertices\": [");
-		boolean first = true;
-		
-		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-			//Write seperator between json objects
-			if(first) {
-				first = false;
-			} else {
-				wrt.write(","); }
-			
-			wrt.write(JsonFactory.toJson(groupVertex));
-		}
-		wrt.write("]");
-		wrt.write("}");
-			
-	}
-	
-	/**
-	 * Writes Json with a list of currently archived jobs, sorted by time
-	 * 
-	 * @param wrt
-	 * @param graphs
-	 */
-	private void writeJsonForArchive(PrintWriter wrt, List<ExecutionGraph> graphs) {
-		
-		wrt.write("[");
-		
-		// sort jobs by time
-		Collections.sort(graphs,  new Comparator<ExecutionGraph>() {
-			@Override
-			public int compare(ExecutionGraph o1, ExecutionGraph o2) {
-				if(o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) {
-					return 1;
-				} else {
-					return -1;
-				}
-			}
-			
-		});
-		
-		// Loop Jobs
-		for (int i = 0; i < graphs.size(); i++) {
-			ExecutionGraph graph = graphs.get(i);
-			
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-			wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-			wrt.write("\"status\": \""+ graph.getState() + "\",");
-			wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState()));
-			
-			wrt.write("}");
-			
-			//Write seperator between json objects
-			if(i != graphs.size() - 1) {
-				wrt.write(",");
-			}
-		}
-		wrt.write("]");
-		
-	}
-	
-	/**
-	 * Writes infos about archived job in Json format, including groupvertices and groupverticetimes
-	 * 
-	 * @param wrt
-	 * @param graph
-	 */
-	private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph graph) {
-		
-		try {
-		
-			wrt.write("[");
-		
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-			wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-			wrt.write("\"status\": \""+ graph.getState() + "\",");
-			wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ",");
-			wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
-			wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
-			wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ",");
-			wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
-
-			if (graph.getState() == JobStatus.FAILED) {
-				wrt.write("\"failednodes\": [");
-				boolean first = true;
-				for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
-					if (vertex.getExecutionState() == ExecutionState.FAILED) {
-						AllocatedSlot slot = vertex.getCurrentAssignedResource();
-						Throwable failureCause = vertex.getFailureCause();
-						if (slot != null || failureCause != null) {
-							if (first) {
-								first = false;
-							} else {
-								wrt.write(",");
-							}
-							wrt.write("{");
-							wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot
-									.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
-							wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\"");
-							wrt.write("}");
-						}
-					}
-				}
-				wrt.write("],");
-			}
-
-			// Serialize ManagementGraph to json
-			wrt.write("\"groupvertices\": [");
-			boolean first = true;
-			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-				//Write seperator between json objects
-				if(first) {
-					first = false;
-				} else {
-					wrt.write(","); }
-				
-				wrt.write(JsonFactory.toJson(groupVertex));
-				
-			}
-			wrt.write("],");
-			
-			// write accumulators
-			AccumulatorResultsResponse response = AkkaUtils.ask(jobmanager,
-					new RequestAccumulatorResults(graph.getJobID()), timeout);
-
-			if(response instanceof AccumulatorResultsFound){
-				Map<String, Object> accMap = ((AccumulatorResultsFound)response).asJavaMap();
-
-				wrt.write("\n\"accumulators\": [");
-				int i = 0;
-				for( Entry<String, Object> accumulator : accMap.entrySet()) {
-					wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\","
-							+ " \"value\": \""+accumulator.getValue().toString()+"\"}\n");
-					if(++i < accMap.size()) {
-						wrt.write(",");
-					}
-				}
-				wrt.write("],\n");
-
-				wrt.write("\"groupverticetimes\": {");
-				first = true;
-				for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-
-					if(first) {
-						first = false;
-					} else {
-						wrt.write(","); }
-
-					// Calculate start and end time for groupvertex
-					long started = Long.MAX_VALUE;
-					long ended = 0;
-
-					// Take earliest running state and latest endstate of groupmembers
-					for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
-						long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
-						if (running != 0 && running < started) {
-							started = running;
-						}
-
-						long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
-						long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
-						long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
-
-						if(finished != 0 && finished > ended) {
-							ended = finished;
-						}
-
-						if(canceled != 0 && canceled > ended) {
-							ended = canceled;
-						}
-
-						if(failed != 0 && failed > ended) {
-							ended = failed;
-						}
-
-					}
-
-					wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
-					wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
-					wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
-					wrt.write("\"STARTED\": "+ started + ",");
-					wrt.write("\"ENDED\": "+ ended);
-					wrt.write("}");
-
-				}
-			}else{
-				LOG.warn("Could not find accumulator results for job ID " + graph.getJobID());
-			}
-
-			wrt.write("}");
-			
-			wrt.write("}");
-			
-			
-		wrt.write("]");
-		
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client	
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
-		} 
-		
-	}
-	
-	
-	/**
-	 * Writes all updates (events) for a given job since a given time
-	 * 
-	 * @param wrt
-	 * @param jobId
-	 */
-	private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
-		
-		try {
-			Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager,
-					RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
-			
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + jobId + "\",");
-			wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\",");
-			wrt.write("\"recentjobs\": [");
-
-			boolean first = true;
-
-			for(ExecutionGraph g : graphs){
-				if(first){
-					first = false;
-				}else{
-					wrt.write(",");
-				}
-
-				wrt.write("\"" + g.getJobID() + "\"");
-			}
-
-			wrt.write("],");
-
-			JobResponse response = AkkaUtils.ask(jobmanager, new RequestJob(jobId), timeout);
-
-			if(response instanceof JobFound){
-				ExecutionGraph graph = ((JobFound)response).executionGraph();
-
-				wrt.write("\"vertexevents\": [");
-
-				first = true;
-				for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
-					if (first) {
-						first = false;
-					} else {
-						wrt.write(",");
-					}
-
-					wrt.write("{");
-					wrt.write("\"vertexid\": \"" + ev.getCurrentExecutionAttempt().getAttemptId()
-							+ "\",");
-					wrt.write("\"newstate\": \"" + ev.getExecutionState() + "\",");
-					wrt.write("\"timestamp\": \"" + ev.getStateTimestamp(ev.getExecutionState())
-							+ "\"");
-					wrt.write("}");
-				}
-
-				wrt.write("],");
-
-				wrt.write("\"jobevents\": [");
-
-				wrt.write("{");
-				wrt.write("\"newstate\": \"" + graph.getState() + "\",");
-				wrt.write("\"timestamp\": \"" + graph.getStatusTimestamp(graph.getState()) + "\"");
-				wrt.write("}");
-
-				wrt.write("]");
-
-				wrt.write("}");
-			}else{
-				wrt.write("\"vertexevents\": [],");
-				wrt.write("\"jobevents\": [");
-				wrt.write("{");
-				wrt.write("\"newstate\": \"" + JobStatus.FINISHED + "\",");
-				wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\"");
-				wrt.write("}");
-				wrt.write("]");
-				wrt.write("}");
-				LOG.warn("WriteJsonUpdatesForJob: Could not find job with job ID " + jobId);
-			}
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client	
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
-		} 
-		
-	}
-	
-	/**
-	 * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses.
-	 */
-	private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, ExecutionGraph graph,
-													JobVertexID vertexId) {
-		ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
-
-		// Serialize ManagementGraph to json
-		wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ",");
-
-		wrt.write("\"verticetimes\": {");
-		boolean first = true;
-		for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
-
-			for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
-				Execution exec = vertex.getCurrentExecutionAttempt();
-
-				if(first) {
-					first = false;
-				} else {
-					wrt.write(","); }
-
-				wrt.write("\""+exec.getAttemptId() +"\": {");
-				wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\",");
-				wrt.write("\"vertexname\": \"" + vertex + "\",");
-				wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
-				wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
-				wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
-				wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
-				wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
-				wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
-				wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
-				wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
-				wrt.write("}");
-			}
-
-		}
-		wrt.write("}}");
-	}
-	
-	/**
-	 * Writes the version and the revision of Flink.
-	 * 
-	 * @param wrt
-	 */
-	private void writeJsonForVersion(PrintWriter wrt) {
-		wrt.write("{");
-		wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\",");
-		wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\"");
-		wrt.write("}");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 103c1be..8e46692 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
@@ -38,7 +38,7 @@ public class JsonFactory {
 		json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
 		json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
 		
-		AllocatedSlot slot = vertex.getCurrentAssignedResource();
+		SimpleSlot slot = vertex.getCurrentAssignedResource();
 		String instanceName = slot == null ? "(null)" : slot.getInstance()
 				.getInstanceConnectionInfo().getFQDNHostname();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 733cf5e..71347ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -128,7 +128,7 @@ public class WebInfoServer {
 		// ----- the handlers for the servlets -----
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
-		servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager,
+		servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager,
 				archive, timeout)), "/jobsInfo");
 		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
 		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)),

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
index 3a53801..ce7db25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
@@ -26,12 +26,11 @@ import java.util.Set;
 
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
 import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent;
 
-
 public class JobProfilingData {
 
 	private final ExecutionGraph executionGraph;
@@ -59,7 +58,7 @@ public class JobProfilingData {
 	public boolean addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData instanceProfilingData) {
 
 		for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) {
-			AllocatedSlot slot = executionVertex.getCurrentAssignedResource();
+			SimpleSlot slot = executionVertex.getCurrentAssignedResource();
 			if (slot != null && slot.getInstance().getPath().equals(
 					instanceProfilingData.getInstancePath()))
 			{
@@ -76,7 +75,7 @@ public class JobProfilingData {
 		final Set<Instance> tempSet = new HashSet<Instance>();
 		
 		for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) {
-			AllocatedSlot slot = executionVertex.getCurrentAssignedResource();
+			SimpleSlot slot = executionVertex.getCurrentAssignedResource();
 			if (slot != null) {
 				tempSet.add(slot.getInstance());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c780589..532f7f8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -437,6 +437,7 @@ class JobManager(val configuration: Configuration)
 
     case Terminated(taskManager) => {
       log.info("Task manager {} terminated.", taskManager.path)
+      JobManager.LOG.warn(s"Task manager ${taskManager.path} terminated.")
       instanceManager.unregisterTaskManager(taskManager)
       context.unwatch(taskManager)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index f4c95c9..4ee28e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -42,8 +42,8 @@ import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor
 import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -120,7 +120,7 @@ public class ExecutionGraphDeploymentTest {
 
 			final Instance instance = getInstance(simpleTaskManager);
 
-			final AllocatedSlot slot = instance.allocateSlot(jobId);
+			final SimpleSlot slot = instance.allocateSimpleSlot(jobId);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 21caca0..e8e1f7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -33,11 +33,11 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -68,7 +68,7 @@ public class ExecutionGraphTestUtils {
 		}
 	}
 	
-	public static void setVertexResource(ExecutionVertex vertex, AllocatedSlot slot) {
+	public static void setVertexResource(ExecutionVertex vertex, SimpleSlot slot) {
 		try {
 			Execution exec = vertex.getCurrentExecutionAttempt();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 4a8c69b..2f1af70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -30,7 +30,7 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -76,7 +76,7 @@ public class ExecutionStateProgressTest {
 			// mock resources and mock taskmanager
 			ActorRef taskManager = system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class));
 			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				AllocatedSlot slot = getInstance(taskManager).allocateSlot(jid);
+				SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid);
 				ee.deployToSlot(slot);
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 3c1b174..ee89954 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -37,7 +37,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -148,7 +148,7 @@ public class ExecutionVertexCancelTest {
 						new TaskOperationResult(execId, false))));
 
 				Instance instance = getInstance(taskManager);
-				AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 				vertex.deployToSlot(slot);
 
@@ -223,7 +223,7 @@ public class ExecutionVertexCancelTest {
 							TaskOperationResult(execId, false), new TaskOperationResult(execId, true))));
 
 					Instance instance = getInstance(taskManager);
-					AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					vertex.deployToSlot(slot);
 
@@ -296,7 +296,7 @@ public class ExecutionVertexCancelTest {
 									TaskOperationResult(execId, true))));
 
 					Instance instance = getInstance(taskManager);
-					AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -344,7 +344,7 @@ public class ExecutionVertexCancelTest {
 							TaskOperationResult(execId, true))));
 
 					Instance instance = getInstance(taskManager);
-					AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -400,7 +400,7 @@ public class ExecutionVertexCancelTest {
 							TaskOperationResult(execId, false))));
 
 					Instance instance = getInstance(taskManager);
-					AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -441,7 +441,7 @@ public class ExecutionVertexCancelTest {
 							CancelSequenceTaskManagerCreator()));
 
 					Instance instance = getInstance(taskManager);
-					AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -487,7 +487,7 @@ public class ExecutionVertexCancelTest {
 							)));
 
 					Instance instance = getInstance(taskManager);
-					AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -544,7 +544,7 @@ public class ExecutionVertexCancelTest {
 			// the scheduler (or any caller) needs to know that the slot should be released
 			try {
 				Instance instance = getInstance(ActorRef.noSender());
-				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 				
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -587,7 +587,7 @@ public class ExecutionVertexCancelTest {
 				setVertexState(vertex, ExecutionState.CANCELING);
 				
 				Instance instance = getInstance(ActorRef.noSender());
-				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 				
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -601,7 +601,7 @@ public class ExecutionVertexCancelTest {
 						AkkaUtils.DEFAULT_TIMEOUT());
 				
 				Instance instance = getInstance(ActorRef.noSender());
-				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 				
 				setVertexResource(vertex, slot);
 				setVertexState(vertex, ExecutionState.CANCELING);

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 330292b..c0d1db8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -29,8 +29,8 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
@@ -64,7 +64,7 @@ public class ExecutionVertexDeploymentTest {
 			// mock taskmanager to simply accept the call
 			Instance instance = getInstance(tm);
 
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 			
@@ -106,8 +106,8 @@ public class ExecutionVertexDeploymentTest {
 					Props.create(SimpleAcknowledgingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 			
@@ -151,7 +151,7 @@ public class ExecutionVertexDeploymentTest {
 			
 			final Instance instance = getInstance(simpleTaskManager);
 
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 			
@@ -206,7 +206,7 @@ public class ExecutionVertexDeploymentTest {
 					Props.create(SimpleFailingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 			
@@ -242,8 +242,8 @@ public class ExecutionVertexDeploymentTest {
 					Props.create(SimpleFailingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
 
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -289,9 +289,8 @@ public class ExecutionVertexDeploymentTest {
 
 			final TestActorRef simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
-
 			final Instance instance = getInstance(simpleTaskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
@@ -343,8 +342,7 @@ public class ExecutionVertexDeploymentTest {
 					TaskOperationResult(eid, false), new TaskOperationResult(eid, true))));
 
 			final Instance instance = getInstance(simpleTaskManager);
-
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 24ac44b..8230433 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -30,8 +30,8 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -65,7 +65,7 @@ public class ExecutionVertexSchedulingTest {
 		try {
 			// a slot than cannot be deployed to
 			final Instance instance = getInstance(ActorRef.noSender());
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			slot.cancel();
 			assertFalse(slot.isReleased());
 			
@@ -96,7 +96,7 @@ public class ExecutionVertexSchedulingTest {
 		try {
 			// a slot than cannot be deployed to
 			final Instance instance = getInstance(ActorRef.noSender());
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			slot.cancel();
 			assertFalse(slot.isReleased());
 			
@@ -136,7 +136,7 @@ public class ExecutionVertexSchedulingTest {
 					.SimpleAcknowledgingTaskManager.class));
 
 			final Instance instance = getInstance(tm);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
index 15cdefb..94bdef1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
@@ -36,7 +36,7 @@ public class AllocatedSlotTest {
 		try {
 			// cancel, then release
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				assertTrue(slot.isAlive());
 				
 				slot.cancel();
@@ -52,7 +52,7 @@ public class AllocatedSlotTest {
 			
 			// release immediately
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				assertTrue(slot.isAlive());
 				
 				slot.releaseSlot();
@@ -75,32 +75,32 @@ public class AllocatedSlotTest {
 			
 			// assign to alive slot
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				
 				assertTrue(slot.setExecutedVertex(ev));
-				assertEquals(ev, slot.getExecutedVertex());
+				assertEquals(ev, slot.getExecution());
 				
 				// try to add another one
 				assertFalse(slot.setExecutedVertex(ev_2));
-				assertEquals(ev, slot.getExecutedVertex());
+				assertEquals(ev, slot.getExecution());
 			}
 			
 			// assign to canceled slot
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				slot.cancel();
 				
 				assertFalse(slot.setExecutedVertex(ev));
-				assertNull(slot.getExecutedVertex());
+				assertNull(slot.getExecution());
 			}
 			
 			// assign to released
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				slot.releaseSlot();
 				
 				assertFalse(slot.setExecutedVertex(ev));
-				assertNull(slot.getExecutedVertex());
+				assertNull(slot.getExecution());
 			}
 		}
 		catch (Exception e) {
@@ -114,9 +114,9 @@ public class AllocatedSlotTest {
 		try {
 			Execution ev = mock(Execution.class);
 			
-			AllocatedSlot slot = getSlot();
+			SimpleSlot slot = getSlot();
 			assertTrue(slot.setExecutedVertex(ev));
-			assertEquals(ev, slot.getExecutedVertex());
+			assertEquals(ev, slot.getExecution());
 			
 			slot.cancel();
 			slot.releaseSlot();
@@ -130,12 +130,12 @@ public class AllocatedSlotTest {
 		}
 	}
 	
-	public static AllocatedSlot getSlot() throws Exception {
+	public static SimpleSlot getSlot() throws Exception {
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 		
 		Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 1);
-		return instance.allocateSlot(new JobID());
+		return instance.allocateSimpleSlot(new JobID());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index a7820a3..47f7a2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -45,10 +45,10 @@ public class InstanceTest {
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
 			
-			AllocatedSlot slot1 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot2 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot3 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot4 = instance.allocateSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID());
 			
 			assertNotNull(slot1);
 			assertNotNull(slot2);
@@ -61,7 +61,7 @@ public class InstanceTest {
 					slot3.getSlotNumber() + slot4.getSlotNumber());
 			
 			// no more slots
-			assertNull(instance.allocateSlot(new JobID()));
+			assertNull(instance.allocateSimpleSlot(new JobID()));
 			try {
 				instance.returnAllocatedSlot(slot2);
 				fail("instance accepted a non-cancelled slot.");
@@ -109,9 +109,9 @@ public class InstanceTest {
 			
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 			
-			AllocatedSlot slot1 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot2 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot3 = instance.allocateSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
 			
 			instance.markDead();
 			
@@ -139,9 +139,9 @@ public class InstanceTest {
 			
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 			
-			AllocatedSlot slot1 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot2 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot3 = instance.allocateSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
 			
 			instance.cancelAndReleaseAllSlots();
 			


Mime
View raw message