flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/3] flink git commit: [FLINK-1376] [runtime] Add proper shared slot release in case of a fatal TaskManager failure.
Date Mon, 09 Feb 2015 12:59:12 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
deleted file mode 100644
index 0b19a5f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-
-public class SubSlot extends AllocatedSlot {
-
-	private final SharedSlot sharedSlot;
-	
-	private final AbstractID groupId;
-	
-	private final int subSlotNumber;
-	
-	
-	public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID groupId) {
-		super(sharedSlot.getAllocatedSlot().getJobID(),
-				sharedSlot.getAllocatedSlot().getInstance(),
-				sharedSlot.getAllocatedSlot().getSlotNumber());
-		
-		this.sharedSlot = sharedSlot;
-		this.groupId = groupId;
-		this.subSlotNumber = subSlotNumber;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public void releaseSlot() {
-		// cancel everything, if there is something. since this is atomically status based,
-		// it will not happen twice if another attempt happened before or concurrently
-		try {
-			cancel();
-		}
-		finally {
-			if (markReleased()) {
-				this.sharedSlot.returnAllocatedSlot(this);
-			}
-		}
-	}
-	
-	public SharedSlot getSharedSlot() {
-		return this.sharedSlot;
-	}
-	
-	public AbstractID getGroupId() {
-		return groupId;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "SubSlot " + subSlotNumber + " (" + super.toString() + ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
new file mode 100644
index 0000000..10796b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.web;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.event.job.AbstractEvent;
+import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
+import org.apache.flink.runtime.event.job.JobEvent;
+import org.apache.flink.runtime.event.job.RecentJobEvent;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.StringUtils;
+import org.eclipse.jetty.io.EofException;
+
+public class JobManagerInfoServlet extends HttpServlet {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class);
+	
+	/** Underlying JobManager */
+	private final JobManager jobmanager;
+	
+	
+	public JobManagerInfoServlet(JobManager jobmanager) {
+		this.jobmanager = jobmanager;
+	}
+	
+	
+	@Override
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+			
+		resp.setStatus(HttpServletResponse.SC_OK);
+		resp.setContentType("application/json");
+		
+		try {
+			if("archive".equals(req.getParameter("get"))) {
+				writeJsonForArchive(resp.getWriter(), jobmanager.getOldJobs());
+			}
+			else if("job".equals(req.getParameter("get"))) {
+				String jobId = req.getParameter("job");
+				writeJsonForArchivedJob(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)));
+			}
+			else if("groupvertex".equals(req.getParameter("get"))) {
+				String jobId = req.getParameter("job");
+				String groupvertexId = req.getParameter("groupvertex");
+				writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), JobVertexID.fromHexString(groupvertexId));
+			}
+			else if("taskmanagers".equals(req.getParameter("get"))) {
+				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getTotalNumberOfRegisteredSlots()+"}");
+			}
+			else if("cancel".equals(req.getParameter("get"))) {
+				String jobId = req.getParameter("job");
+				jobmanager.cancelJob(JobID.fromHexString(jobId));
+			}
+			else if("updates".equals(req.getParameter("get"))) {
+				String jobId = req.getParameter("job");
+				writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId));
+			} else if ("version".equals(req.getParameter("get"))) {
+				writeJsonForVersion(resp.getWriter());
+			}
+			else{
+				writeJsonForJobs(resp.getWriter(), jobmanager.getRecentJobs());
+			}
+			
+		} catch (Exception e) {
+			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+			resp.getWriter().print(e.getMessage());
+			if (LOG.isWarnEnabled()) {
+				LOG.warn(StringUtils.stringifyException(e));
+			}
+		}
+	}
+	
+	/**
+	 * Writes ManagementGraph as Json for all recent jobs
+	 * 
+	 * @param wrt
+	 * @param jobs
+	 */
+	private void writeJsonForJobs(PrintWriter wrt, List<RecentJobEvent> jobs) {
+		
+		try {
+		
+			wrt.write("[");
+			
+			// Loop Jobs
+			for (int i = 0; i < jobs.size(); i++) {
+				RecentJobEvent jobEvent = jobs.get(i);
+	
+				writeJsonForJob(wrt, jobEvent);
+	
+				//Write seperator between json objects
+				if(i != jobs.size() - 1) {
+					wrt.write(",");
+				}
+			}
+			wrt.write("]");
+		
+		} catch (EofException eof) { // Connection closed by client
+			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
+		} catch (IOException ioe) { // Connection closed by client	
+			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
+		} 
+		
+	}
+	
+	private void writeJsonForJob(PrintWriter wrt, RecentJobEvent jobEvent) throws IOException {
+		
+		ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
+		
+		//Serialize job to json
+		wrt.write("{");
+		wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
+		wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
+		wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
+		wrt.write("\"time\": " + jobEvent.getTimestamp()+",");
+		
+		// Serialize ManagementGraph to json
+		wrt.write("\"groupvertices\": [");
+		boolean first = true;
+		
+		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+			//Write seperator between json objects
+			if(first) {
+				first = false;
+			} else {
+				wrt.write(","); }
+			
+			wrt.write(JsonFactory.toJson(groupVertex));
+		}
+		wrt.write("]");
+		wrt.write("}");
+			
+	}
+	
+	/**
+	 * Writes Json with a list of currently archived jobs, sorted by time
+	 * 
+	 * @param wrt
+	 * @param jobs
+	 */
+	private void writeJsonForArchive(PrintWriter wrt, List<RecentJobEvent> jobs) {
+		
+		wrt.write("[");
+		
+		// sort jobs by time
+		Collections.sort(jobs,  new Comparator<RecentJobEvent>() {
+			@Override
+			public int compare(RecentJobEvent o1, RecentJobEvent o2) {
+				if(o1.getTimestamp() < o2.getTimestamp()) {
+					return 1;
+				} else {
+					return -1;
+				}
+			}
+			
+		});
+		
+		// Loop Jobs
+		for (int i = 0; i < jobs.size(); i++) {
+			RecentJobEvent jobEvent = jobs.get(i);
+			
+			//Serialize job to json
+			wrt.write("{");
+			wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
+			wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
+			wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
+			wrt.write("\"time\": " + jobEvent.getTimestamp());
+			
+			wrt.write("}");
+			
+			//Write seperator between json objects
+			if(i != jobs.size() - 1) {
+				wrt.write(",");
+			}
+		}
+		wrt.write("]");
+		
+	}
+	
+	/**
+	 * Writes infos about archived job in Json format, including groupvertices and groupverticetimes
+	 * 
+	 * @param wrt
+	 * @param jobEvent
+	 */
+	private void writeJsonForArchivedJob(PrintWriter wrt, RecentJobEvent jobEvent) {
+		
+		try {
+		
+			wrt.write("[");
+		
+			ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
+			
+			//Serialize job to json
+			wrt.write("{");
+			wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
+			wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
+			wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
+			wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ",");
+			wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
+			wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
+			wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ",");
+			wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
+
+			if (jobEvent.getJobStatus() == JobStatus.FAILED) {
+				wrt.write("\"failednodes\": [");
+				boolean first = true;
+				for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
+					if (vertex.getExecutionState() == ExecutionState.FAILED) {
+						SimpleSlot slot = vertex.getCurrentAssignedResource();
+						Throwable failureCause = vertex.getFailureCause();
+						if (slot != null || failureCause != null) {
+							if (first) {
+								first = false;
+							} else {
+								wrt.write(",");
+							}
+							wrt.write("{");
+							wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
+							wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\"");
+							wrt.write("}");
+						}
+					}
+				}
+				wrt.write("],");
+			}
+
+			// Serialize ManagementGraph to json
+			wrt.write("\"groupvertices\": [");
+			boolean first = true;
+			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+				//Write seperator between json objects
+				if(first) {
+					first = false;
+				} else {
+					wrt.write(","); }
+				
+				wrt.write(JsonFactory.toJson(groupVertex));
+				
+			}
+			wrt.write("],");
+			
+			// write accumulators
+			Map<String, Object> accMap = AccumulatorHelper.toResultMap(jobmanager.getAccumulators(jobEvent.getJobID()));
+			
+			wrt.write("\n\"accumulators\": [");
+			int i = 0;
+			for( Entry<String, Object> accumulator : accMap.entrySet()) {
+				wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\","
+						+ " \"value\": \""+accumulator.getValue().toString()+"\"}\n");
+				if(++i < accMap.size()) {
+					wrt.write(",");
+				}
+			}
+			wrt.write("],\n");
+			
+			wrt.write("\"groupverticetimes\": {");
+			first = true;
+			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+				
+				if(first) {
+					first = false;
+				} else {
+					wrt.write(","); }
+				
+				// Calculate start and end time for groupvertex
+				long started = Long.MAX_VALUE;
+				long ended = 0;
+				
+				// Take earliest running state and latest endstate of groupmembers
+				for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+					
+					long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
+					if (running != 0 && running < started) {
+						started = running;
+					}
+					
+					long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
+					long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
+					long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
+					
+					if(finished != 0 && finished > ended) {
+						ended = finished;
+					}
+					
+					if(canceled != 0 && canceled > ended) {
+						ended = canceled;
+					}
+					
+					if(failed != 0 && failed > ended) {
+						ended = failed;
+					}
+
+				}
+				
+				wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
+				wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
+				wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
+				wrt.write("\"STARTED\": "+ started + ",");
+				wrt.write("\"ENDED\": "+ ended);
+				wrt.write("}");
+				
+			}
+
+			wrt.write("}");
+			
+			wrt.write("}");
+			
+			
+		wrt.write("]");
+		
+		} catch (EofException eof) { // Connection closed by client
+			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
+		} catch (IOException ioe) { // Connection closed by client	
+			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
+		} 
+		
+	}
+	
+	
+	/**
+	 * Writes all updates (events) for a given job since a given time
+	 * 
+	 * @param wrt
+	 * @param jobId
+	 */
+	private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
+		
+		try {
+			
+			List<AbstractEvent> events = jobmanager.getEvents(jobId);
+			
+			//Serialize job to json
+			wrt.write("{");
+			wrt.write("\"jobid\": \"" + jobId + "\",");
+			wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\",");
+			wrt.write("\"recentjobs\": [");
+				
+			boolean first = true;
+			for(RecentJobEvent rje: jobmanager.getRecentJobs()) {
+				if(first) {
+					first = false;
+				} else {
+					wrt.write(","); }
+				
+				wrt.write("\""+rje.getJobID().toString()+"\""); 
+			}
+					
+			wrt.write("],");
+			
+			wrt.write("\"vertexevents\": [");
+		
+			first = true;
+			for (AbstractEvent event: events) {
+				
+				if (event instanceof ExecutionStateChangeEvent) {
+					
+					if(first) {
+						first = false;
+					} else {
+						wrt.write(","); }
+					
+					ExecutionStateChangeEvent vertexevent = (ExecutionStateChangeEvent) event;
+					wrt.write("{");
+					wrt.write("\"vertexid\": \"" + vertexevent.getExecutionAttemptID() + "\",");
+					wrt.write("\"newstate\": \"" + vertexevent.getNewExecutionState() + "\",");
+					wrt.write("\"timestamp\": \"" + vertexevent.getTimestamp() + "\"");
+					wrt.write("}");
+				}
+			}
+			
+			wrt.write("],");
+			
+			wrt.write("\"jobevents\": [");
+			
+			first = true;
+			for(AbstractEvent event: events) {
+				
+				if( event instanceof JobEvent) {
+					
+					if(first) {
+						first = false;
+					} else {
+						wrt.write(","); }
+					
+					JobEvent jobevent = (JobEvent) event;
+					wrt.write("{");
+					wrt.write("\"newstate\": \"" + jobevent.getCurrentJobStatus() + "\",");
+					wrt.write("\"timestamp\": \"" + jobevent.getTimestamp() + "\"");
+					wrt.write("}");
+				}
+			}
+			
+			wrt.write("]");
+			
+			wrt.write("}");
+			
+		
+		} catch (EofException eof) { // Connection closed by client
+			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
+		} catch (IOException ioe) { // Connection closed by client	
+			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
+		} 
+		
+	}
+	
+	/**
+	 * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses.
+	 */
+	private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, RecentJobEvent jobEvent, JobVertexID vertexId) {
+		try {
+			ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
+			
+			ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
+			
+			// Serialize ManagementGraph to json
+			wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ",");
+			
+			wrt.write("\"verticetimes\": {");
+			boolean first = true;
+			for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
+				
+				for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+					
+					Execution exec = vertex.getCurrentExecutionAttempt();
+					
+					if(first) {
+						first = false;
+					} else {
+						wrt.write(","); }
+					
+					wrt.write("\""+exec.getAttemptId() +"\": {");
+					wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\",");
+					wrt.write("\"vertexname\": \"" + vertex + "\",");
+					wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
+					wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
+					wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
+					wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
+					wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
+					wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
+					wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
+					wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
+					wrt.write("}");
+				}
+				
+			}
+			wrt.write("}}");
+			
+		}
+		catch (IOException ioe) { // Connection closed by client
+			String message = "Info server for jobmanager: Connection closed by client - " + ioe.getClass().getSimpleName();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(message, ioe);
+			}
+			else if (LOG.isInfoEnabled()) {
+				LOG.info(message);
+			}
+		} 
+	}
+	
+	/**
+	 * Writes the version and the revision of Flink.
+	 * 
+	 * @param wrt
+	 */
+	private void writeJsonForVersion(PrintWriter wrt) {
+		wrt.write("{");
+		wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\",");
+		wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\"");
+		wrt.write("}");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
deleted file mode 100644
index ef5a246..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.runtime.event.job.AbstractEvent;
-import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
-import org.apache.flink.runtime.event.job.JobEvent;
-import org.apache.flink.runtime.event.job.RecentJobEvent;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.StringUtils;
-import org.eclipse.jetty.io.EofException;
-
-
-public class JobmanagerInfoServlet extends HttpServlet {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(JobmanagerInfoServlet.class);
-	
-	/** Underlying JobManager */
-	private final JobManager jobmanager;
-	
-	
-	public JobmanagerInfoServlet(JobManager jobmanager) {
-		this.jobmanager = jobmanager;
-	}
-	
-	
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-			
-		resp.setStatus(HttpServletResponse.SC_OK);
-		resp.setContentType("application/json");
-		
-		try {
-			if("archive".equals(req.getParameter("get"))) {
-				writeJsonForArchive(resp.getWriter(), jobmanager.getOldJobs());
-			}
-			else if("job".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				writeJsonForArchivedJob(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)));
-			}
-			else if("groupvertex".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				String groupvertexId = req.getParameter("groupvertex");
-				writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), JobVertexID.fromHexString(groupvertexId));
-			}
-			else if("taskmanagers".equals(req.getParameter("get"))) {
-				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getTotalNumberOfRegisteredSlots()+"}");
-			}
-			else if("cancel".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				jobmanager.cancelJob(JobID.fromHexString(jobId));
-			}
-			else if("updates".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId));
-			} else if ("version".equals(req.getParameter("get"))) {
-				writeJsonForVersion(resp.getWriter());
-			}
-			else{
-				writeJsonForJobs(resp.getWriter(), jobmanager.getRecentJobs());
-			}
-			
-		} catch (Exception e) {
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			resp.getWriter().print(e.getMessage());
-			if (LOG.isWarnEnabled()) {
-				LOG.warn(StringUtils.stringifyException(e));
-			}
-		}
-	}
-	
-	/**
-	 * Writes ManagementGraph as Json for all recent jobs
-	 * 
-	 * @param wrt
-	 * @param jobs
-	 */
-	private void writeJsonForJobs(PrintWriter wrt, List<RecentJobEvent> jobs) {
-		
-		try {
-		
-			wrt.write("[");
-			
-			// Loop Jobs
-			for (int i = 0; i < jobs.size(); i++) {
-				RecentJobEvent jobEvent = jobs.get(i);
-	
-				writeJsonForJob(wrt, jobEvent);
-	
-				//Write seperator between json objects
-				if(i != jobs.size() - 1) {
-					wrt.write(",");
-				}
-			}
-			wrt.write("]");
-		
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client	
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
-		} 
-		
-	}
-	
-	private void writeJsonForJob(PrintWriter wrt, RecentJobEvent jobEvent) throws IOException {
-		
-		ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
-		
-		//Serialize job to json
-		wrt.write("{");
-		wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
-		wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
-		wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
-		wrt.write("\"time\": " + jobEvent.getTimestamp()+",");
-		
-		// Serialize ManagementGraph to json
-		wrt.write("\"groupvertices\": [");
-		boolean first = true;
-		
-		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-			//Write seperator between json objects
-			if(first) {
-				first = false;
-			} else {
-				wrt.write(","); }
-			
-			wrt.write(JsonFactory.toJson(groupVertex));
-		}
-		wrt.write("]");
-		wrt.write("}");
-			
-	}
-	
-	/**
-	 * Writes Json with a list of currently archived jobs, sorted by time
-	 * 
-	 * @param wrt
-	 * @param jobs
-	 */
-	private void writeJsonForArchive(PrintWriter wrt, List<RecentJobEvent> jobs) {
-		
-		wrt.write("[");
-		
-		// sort jobs by time
-		Collections.sort(jobs,  new Comparator<RecentJobEvent>() {
-			@Override
-			public int compare(RecentJobEvent o1, RecentJobEvent o2) {
-				if(o1.getTimestamp() < o2.getTimestamp()) {
-					return 1;
-				} else {
-					return -1;
-				}
-			}
-			
-		});
-		
-		// Loop Jobs
-		for (int i = 0; i < jobs.size(); i++) {
-			RecentJobEvent jobEvent = jobs.get(i);
-			
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
-			wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
-			wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
-			wrt.write("\"time\": " + jobEvent.getTimestamp());
-			
-			wrt.write("}");
-			
-			//Write seperator between json objects
-			if(i != jobs.size() - 1) {
-				wrt.write(",");
-			}
-		}
-		wrt.write("]");
-		
-	}
-	
-	/**
-	 * Writes infos about archived job in Json format, including groupvertices and groupverticetimes
-	 * 
-	 * @param wrt
-	 * @param jobEvent
-	 */
-	private void writeJsonForArchivedJob(PrintWriter wrt, RecentJobEvent jobEvent) {
-		
-		try {
-		
-			wrt.write("[");
-		
-			ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
-			
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + jobEvent.getJobID() + "\",");
-			wrt.write("\"jobname\": \"" + jobEvent.getJobName()+"\",");
-			wrt.write("\"status\": \""+ jobEvent.getJobStatus() + "\",");
-			wrt.write("\"SCHEDULED\": "+ graph.getStatusTimestamp(JobStatus.CREATED) + ",");
-			wrt.write("\"RUNNING\": "+ graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
-			wrt.write("\"FINISHED\": "+ graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
-			wrt.write("\"FAILED\": "+ graph.getStatusTimestamp(JobStatus.FAILED) + ",");
-			wrt.write("\"CANCELED\": "+ graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
-
-			if (jobEvent.getJobStatus() == JobStatus.FAILED) {
-				wrt.write("\"failednodes\": [");
-				boolean first = true;
-				for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
-					if (vertex.getExecutionState() == ExecutionState.FAILED) {
-						AllocatedSlot slot = vertex.getCurrentAssignedResource();
-						Throwable failureCause = vertex.getFailureCause();
-						if (slot != null || failureCause != null) {
-							if (first) {
-								first = false;
-							} else {
-								wrt.write(",");
-							}
-							wrt.write("{");
-							wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
-							wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\"");
-							wrt.write("}");
-						}
-					}
-				}
-				wrt.write("],");
-			}
-
-			// Serialize ManagementGraph to json
-			wrt.write("\"groupvertices\": [");
-			boolean first = true;
-			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-				//Write seperator between json objects
-				if(first) {
-					first = false;
-				} else {
-					wrt.write(","); }
-				
-				wrt.write(JsonFactory.toJson(groupVertex));
-				
-			}
-			wrt.write("],");
-			
-			// write accumulators
-			Map<String, Object> accMap = AccumulatorHelper.toResultMap(jobmanager.getAccumulators(jobEvent.getJobID()));
-			
-			wrt.write("\n\"accumulators\": [");
-			int i = 0;
-			for( Entry<String, Object> accumulator : accMap.entrySet()) {
-				wrt.write("{ \"name\": \""+accumulator.getKey()+" (" + accumulator.getValue().getClass().getName()+")\","
-						+ " \"value\": \""+accumulator.getValue().toString()+"\"}\n");
-				if(++i < accMap.size()) {
-					wrt.write(",");
-				}
-			}
-			wrt.write("],\n");
-			
-			wrt.write("\"groupverticetimes\": {");
-			first = true;
-			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-				
-				if(first) {
-					first = false;
-				} else {
-					wrt.write(","); }
-				
-				// Calculate start and end time for groupvertex
-				long started = Long.MAX_VALUE;
-				long ended = 0;
-				
-				// Take earliest running state and latest endstate of groupmembers
-				for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-					
-					long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
-					if (running != 0 && running < started) {
-						started = running;
-					}
-					
-					long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
-					long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
-					long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
-					
-					if(finished != 0 && finished > ended) {
-						ended = finished;
-					}
-					
-					if(canceled != 0 && canceled > ended) {
-						ended = canceled;
-					}
-					
-					if(failed != 0 && failed > ended) {
-						ended = failed;
-					}
-
-				}
-				
-				wrt.write("\""+groupVertex.getJobVertexId()+"\": {");
-				wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
-				wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
-				wrt.write("\"STARTED\": "+ started + ",");
-				wrt.write("\"ENDED\": "+ ended);
-				wrt.write("}");
-				
-			}
-
-			wrt.write("}");
-			
-			wrt.write("}");
-			
-			
-		wrt.write("]");
-		
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client	
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
-		} 
-		
-	}
-	
-	
-	/**
-	 * Writes all updates (events) for a given job since a given time
-	 * 
-	 * @param wrt
-	 * @param jobId
-	 */
-	private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
-		
-		try {
-			
-			List<AbstractEvent> events = jobmanager.getEvents(jobId);
-			
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + jobId + "\",");
-			wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\",");
-			wrt.write("\"recentjobs\": [");
-				
-			boolean first = true;
-			for(RecentJobEvent rje: jobmanager.getRecentJobs()) {
-				if(first) {
-					first = false;
-				} else {
-					wrt.write(","); }
-				
-				wrt.write("\""+rje.getJobID().toString()+"\""); 
-			}
-					
-			wrt.write("],");
-			
-			wrt.write("\"vertexevents\": [");
-		
-			first = true;
-			for (AbstractEvent event: events) {
-				
-				if (event instanceof ExecutionStateChangeEvent) {
-					
-					if(first) {
-						first = false;
-					} else {
-						wrt.write(","); }
-					
-					ExecutionStateChangeEvent vertexevent = (ExecutionStateChangeEvent) event;
-					wrt.write("{");
-					wrt.write("\"vertexid\": \"" + vertexevent.getExecutionAttemptID() + "\",");
-					wrt.write("\"newstate\": \"" + vertexevent.getNewExecutionState() + "\",");
-					wrt.write("\"timestamp\": \"" + vertexevent.getTimestamp() + "\"");
-					wrt.write("}");
-				}
-			}
-			
-			wrt.write("],");
-			
-			wrt.write("\"jobevents\": [");
-			
-			first = true;
-			for(AbstractEvent event: events) {
-				
-				if( event instanceof JobEvent) {
-					
-					if(first) {
-						first = false;
-					} else {
-						wrt.write(","); }
-					
-					JobEvent jobevent = (JobEvent) event;
-					wrt.write("{");
-					wrt.write("\"newstate\": \"" + jobevent.getCurrentJobStatus() + "\",");
-					wrt.write("\"timestamp\": \"" + jobevent.getTimestamp() + "\"");
-					wrt.write("}");
-				}
-			}
-			
-			wrt.write("]");
-			
-			wrt.write("}");
-			
-		
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client	
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
-		} 
-		
-	}
-	
-	/**
-	 * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses.
-	 */
-	private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, RecentJobEvent jobEvent, JobVertexID vertexId) {
-		try {
-			ExecutionGraph graph = jobmanager.getRecentExecutionGraph(jobEvent.getJobID());
-			
-			ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
-			
-			// Serialize ManagementGraph to json
-			wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ",");
-			
-			wrt.write("\"verticetimes\": {");
-			boolean first = true;
-			for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
-				
-				for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-					
-					Execution exec = vertex.getCurrentExecutionAttempt();
-					
-					if(first) {
-						first = false;
-					} else {
-						wrt.write(","); }
-					
-					wrt.write("\""+exec.getAttemptId() +"\": {");
-					wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\",");
-					wrt.write("\"vertexname\": \"" + vertex + "\",");
-					wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
-					wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
-					wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
-					wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
-					wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
-					wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
-					wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
-					wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
-					wrt.write("}");
-				}
-				
-			}
-			wrt.write("}}");
-			
-		}
-		catch (IOException ioe) { // Connection closed by client
-			String message = "Info server for jobmanager: Connection closed by client - " + ioe.getClass().getSimpleName();
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(message, ioe);
-			}
-			else if (LOG.isInfoEnabled()) {
-				LOG.info(message);
-			}
-		} 
-	}
-	
-	/**
-	 * Writes the version and the revision of Flink.
-	 * 
-	 * @param wrt
-	 */
-	private void writeJsonForVersion(PrintWriter wrt) {
-		wrt.write("{");
-		wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\",");
-		wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\"");
-		wrt.write("}");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 1469a4c..0447b00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.util.StringUtils;
 
@@ -39,7 +39,7 @@ public class JsonFactory {
 		json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
 		json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
 		
-		AllocatedSlot slot = vertex.getCurrentAssignedResource();
+		SimpleSlot slot = vertex.getCurrentAssignedResource();
 		String instanceName = slot == null ? "(null)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname();
 		
 		json.append("\"vertexinstancename\": \"" + instanceName + "\"");

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 9766ab6..31800e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -124,7 +124,7 @@ public class WebInfoServer {
 		// ----- the handlers for the servlets -----
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
-		servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager)), "/jobsInfo");
+		servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager)), "/jobsInfo");
 		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
 		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo");
 		servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
index 241d6cd..f9126fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
@@ -26,13 +26,12 @@ import java.util.Set;
 
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
 import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent;
 
-
 public class JobProfilingData {
 
 	private final ExecutionGraph executionGraph;
@@ -59,7 +58,7 @@ public class JobProfilingData {
 	public boolean addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData instanceProfilingData) {
 
 		for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) {
-			AllocatedSlot slot = executionVertex.getCurrentAssignedResource();
+			SimpleSlot slot = executionVertex.getCurrentAssignedResource();
 			if (slot != null && slot.getInstance().getInstanceConnectionInfo().equals(
 					instanceProfilingData.getInstanceConnectionInfo()))
 			{
@@ -76,7 +75,7 @@ public class JobProfilingData {
 		final Set<Instance> tempSet = new HashSet<Instance>();
 		
 		for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) {
-			AllocatedSlot slot = executionVertex.getCurrentAssignedResource();
+			SimpleSlot slot = executionVertex.getCurrentAssignedResource();
 			if (slot != null) {
 				tempSet.add(slot.getInstance());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 1c23293..e6e5287 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -423,7 +423,7 @@ public class TaskManager implements TaskOperationProtocol {
 		LOG.info("Shutting down TaskManager");
 		
 		cancelAndClearEverything(new Exception("Task Manager is shutting down"));
-		
+
 		// first, stop the heartbeat thread and wait for it to terminate
 		this.heartbeatThread.interrupt();
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 14ce15a..cf63e43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -38,8 +38,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -117,7 +117,7 @@ public class ExecutionGraphDeploymentTest {
 			});
 			
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(jobId);
+			final SimpleSlot slot = instance.allocateSimpleSlot(jobId);
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 30b05ee..c685ec0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -32,11 +32,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -67,7 +67,7 @@ public class ExecutionGraphTestUtils {
 		}
 	}
 	
-	public static void setVertexResource(ExecutionVertex vertex, AllocatedSlot slot) {
+	public static void setVertexResource(ExecutionVertex vertex, SimpleSlot slot) {
 		try {
 			Execution exec = vertex.getCurrentExecutionAttempt();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 2848466..f0c9342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.mock;
 import java.util.Arrays;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -56,7 +56,7 @@ public class ExecutionStateProgressTest {
 			// mock resources and mock taskmanager
 			TaskOperationProtocol taskManager = getSimpleAcknowledgingTaskmanager();
 			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				AllocatedSlot slot = getInstance(taskManager).allocateSlot(jid);
+				SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid);
 				ee.deployToSlot(slot);
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 9769529..ff1ef5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -120,7 +120,7 @@ public class ExecutionVertexCancelTest {
 			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true), new TaskOperationResult(execId, false));
 			
 			Instance instance = getInstance(taskManager);
-			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			vertex.deployToSlot(slot);
 			
@@ -193,7 +193,7 @@ public class ExecutionVertexCancelTest {
 			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, false), new TaskOperationResult(execId, true));
 			
 			Instance instance = getInstance(taskManager);
-			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			vertex.deployToSlot(slot);
 			
@@ -260,7 +260,7 @@ public class ExecutionVertexCancelTest {
 			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true));
 			
 			Instance instance = getInstance(taskManager);
-			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
@@ -301,7 +301,7 @@ public class ExecutionVertexCancelTest {
 			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true));
 			
 			Instance instance = getInstance(taskManager);
-			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
@@ -353,7 +353,7 @@ public class ExecutionVertexCancelTest {
 			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, false));
 			
 			Instance instance = getInstance(taskManager);
-			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
@@ -389,7 +389,7 @@ public class ExecutionVertexCancelTest {
 			when(taskManager.cancelTask(execId)).thenThrow(new IOException("RPC call failed"));
 			
 			Instance instance = getInstance(taskManager);
-			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
@@ -428,7 +428,7 @@ public class ExecutionVertexCancelTest {
 			when(taskManager.cancelTask(execId)).thenThrow(new IOException("RPC call failed"));
 			
 			Instance instance = getInstance(taskManager);
-			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
@@ -484,7 +484,7 @@ public class ExecutionVertexCancelTest {
 			try {
 				TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 				Instance instance = getInstance(taskManager);
-				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 				
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -526,7 +526,7 @@ public class ExecutionVertexCancelTest {
 				
 				TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 				Instance instance = getInstance(taskManager);
-				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 				
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -540,7 +540,7 @@ public class ExecutionVertexCancelTest {
 				
 				TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 				Instance instance = getInstance(taskManager);
-				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 				
 				setVertexResource(vertex, slot);
 				setVertexState(vertex, ExecutionState.CANCELING);

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index f3081bc..5c7d58c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -30,8 +30,8 @@ import static org.mockito.Matchers.any;
 
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
@@ -52,7 +52,7 @@ public class ExecutionVertexDeploymentTest {
 			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
 			
@@ -88,7 +88,7 @@ public class ExecutionVertexDeploymentTest {
 			// mock taskmanager to simply accept the call
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
 			
@@ -133,7 +133,7 @@ public class ExecutionVertexDeploymentTest {
 			
 			
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingAsynchronously(jid);
 			
@@ -189,7 +189,7 @@ public class ExecutionVertexDeploymentTest {
 			// mock taskmanager to simply accept the call
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
 			
@@ -225,7 +225,7 @@ public class ExecutionVertexDeploymentTest {
 			// mock taskmanager to simply accept the call
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingAsynchronously(jid);
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
@@ -269,7 +269,7 @@ public class ExecutionVertexDeploymentTest {
 			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
 			
@@ -306,7 +306,7 @@ public class ExecutionVertexDeploymentTest {
 			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, queue);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index e2f0ee8..e1e4549 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -25,8 +25,8 @@ import static org.mockito.Mockito.*;
 
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -47,7 +47,7 @@ public class ExecutionVertexSchedulingTest {
 			// a slot than cannot be deployed to
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			slot.cancel();
 			assertFalse(slot.isReleased());
 			
@@ -80,7 +80,7 @@ public class ExecutionVertexSchedulingTest {
 			// a slot than cannot be deployed to
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			slot.cancel();
 			assertFalse(slot.isReleased());
 			
@@ -119,7 +119,7 @@ public class ExecutionVertexSchedulingTest {
 			// a slot than cannot be deployed to
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
-			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
index a9af2cf..a321ad8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
@@ -35,7 +35,7 @@ public class AllocatedSlotTest {
 		try {
 			// cancel, then release
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				assertTrue(slot.isAlive());
 				
 				slot.cancel();
@@ -51,7 +51,7 @@ public class AllocatedSlotTest {
 			
 			// release immediately
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				assertTrue(slot.isAlive());
 				
 				slot.releaseSlot();
@@ -74,32 +74,32 @@ public class AllocatedSlotTest {
 			
 			// assign to alive slot
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				
 				assertTrue(slot.setExecutedVertex(ev));
-				assertEquals(ev, slot.getExecutedVertex());
+				assertEquals(ev, slot.getExecution());
 				
 				// try to add another one
 				assertFalse(slot.setExecutedVertex(ev_2));
-				assertEquals(ev, slot.getExecutedVertex());
+				assertEquals(ev, slot.getExecution());
 			}
 			
 			// assign to canceled slot
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				slot.cancel();
 				
 				assertFalse(slot.setExecutedVertex(ev));
-				assertNull(slot.getExecutedVertex());
+				assertNull(slot.getExecution());
 			}
 			
 			// assign to released
 			{
-				AllocatedSlot slot = getSlot();
+				SimpleSlot slot = getSlot();
 				slot.releaseSlot();
 				
 				assertFalse(slot.setExecutedVertex(ev));
-				assertNull(slot.getExecutedVertex());
+				assertNull(slot.getExecution());
 			}
 		}
 		catch (Exception e) {
@@ -113,9 +113,9 @@ public class AllocatedSlotTest {
 		try {
 			Execution ev = mock(Execution.class);
 			
-			AllocatedSlot slot = getSlot();
+			SimpleSlot slot = getSlot();
 			assertTrue(slot.setExecutedVertex(ev));
-			assertEquals(ev, slot.getExecutedVertex());
+			assertEquals(ev, slot.getExecution());
 			
 			slot.cancel();
 			slot.releaseSlot();
@@ -129,12 +129,12 @@ public class AllocatedSlotTest {
 		}
 	}
 	
-	public static AllocatedSlot getSlot() throws Exception {
+	public static SimpleSlot getSlot() throws Exception {
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001);
 		
 		Instance instance = new Instance(connection, new InstanceID(), hardwareDescription, 1);
-		return instance.allocateSlot(new JobID());
+		return instance.allocateSimpleSlot(new JobID());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 3a80357..5e008b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -44,10 +44,10 @@ public class InstanceTest {
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
 			
-			AllocatedSlot slot1 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot2 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot3 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot4 = instance.allocateSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID());
 			
 			assertNotNull(slot1);
 			assertNotNull(slot2);
@@ -60,7 +60,7 @@ public class InstanceTest {
 					slot3.getSlotNumber() + slot4.getSlotNumber());
 			
 			// no more slots
-			assertNull(instance.allocateSlot(new JobID()));
+			assertNull(instance.allocateSimpleSlot(new JobID()));
 			try {
 				instance.returnAllocatedSlot(slot2);
 				fail("instance accepted a non-cancelled slot.");
@@ -108,9 +108,9 @@ public class InstanceTest {
 			
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 			
-			AllocatedSlot slot1 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot2 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot3 = instance.allocateSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
 			
 			instance.markDead();
 			
@@ -138,9 +138,9 @@ public class InstanceTest {
 			
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 			
-			AllocatedSlot slot1 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot2 = instance.allocateSlot(new JobID());
-			AllocatedSlot slot3 = instance.allocateSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
 			
 			instance.cancelAndReleaseAllSlots();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index cc767c2..5e93cbe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -103,4 +103,20 @@ public class JobManagerTestUtils {
 			}
 		}
 	}
+
+	public static void killRandomHeartbeatThread() throws InterruptedException {
+		Thread[] threads = new Thread[Thread.activeCount()];
+		Thread.enumerate(threads);
+
+		for (Thread t : threads) {
+			if (t == null) {
+				continue;
+			}
+			if (t.getName().equals("Heartbeat Thread")) {
+				t.stop();
+				t.join();
+				return;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
index 6b8be15..888da9c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.killRandomHeartbeatThread;
 import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
 import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
 import static org.junit.Assert.*;
@@ -112,4 +113,65 @@ public class TaskManagerFailsITCase {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testExecutionWithFailingHeartbeat() {
+		final int NUM_TASKS = 31;
+
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(BlockingReceiver.class);
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+			final JobManager jm = startJobManager(2, NUM_TASKS);
+
+			try {
+
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+				// wait until everyone has settled in
+				long deadline = System.currentTimeMillis() + 2000;
+				while (System.currentTimeMillis() < deadline) {
+
+					boolean allrunning = true;
+					for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+						if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+							allrunning = false;
+							break;
+						}
+					}
+
+					if (allrunning) {
+						break;
+					}
+					Thread.sleep(200);
+				}
+
+				// kill one task manager's heartbeat
+				killRandomHeartbeatThread();
+
+				eg.waitForJobEnd();
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
index 239ab9d..5680f47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
 import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.killRandomHeartbeatThread;
 import static org.junit.Assert.*;
 
 import org.apache.flink.runtime.client.AbstractJobResult;
@@ -115,4 +116,67 @@ public class TaskManagerFailsWithSlotSharingITCase {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testExecutionWithFailingHeartbeat() {
+		final int NUM_TASKS = 20;
+
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(BlockingReceiver.class);
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+			final JobManager jm = startJobManager(2, NUM_TASKS / 2);
+
+			try {
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+				// wait until everyone has settled in
+				long deadline = System.currentTimeMillis() + 2000;
+				while (System.currentTimeMillis() < deadline) {
+
+					boolean allrunning = true;
+					for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+						if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+							allrunning = false;
+							break;
+						}
+					}
+
+					if (allrunning) {
+						break;
+					}
+					Thread.sleep(200);
+				}
+
+				killRandomHeartbeatThread();
+
+				eg.waitForJobEnd();
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }


Mime
View raw message