flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [09/63] [abbrv] Refactor job graph construction to incremental attachment based
Date Sun, 21 Sep 2014 02:12:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
deleted file mode 100644
index 08a03bc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-
-/**
- * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
- * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
- * a JobOutputVertex must not have any further output.
- */
-public class OutputFormatOutputVertex extends AbstractJobOutputVertex {
-	/**
-	 * Contains the output format associated to this output vertex. It can be <pre>null</pre>.
-	 */
-	private OutputFormat<?> outputFormat;
-
-
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public OutputFormatOutputVertex(String name, JobGraph jobGraph) {
-		this(name, null, jobGraph);
-	}
-	
-	public OutputFormatOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public OutputFormatOutputVertex(JobGraph jobGraph) {
-		this(null, jobGraph);
-	}
-	
-	public void setOutputFormat(OutputFormat<?> format) {
-		this.outputFormat = format;
-	}
-	
-	public void initializeOutputFormatFromTaskConfig(ClassLoader cl) {
-		TaskConfig cfg = new TaskConfig(getConfiguration());
-		UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(cl);
-		
-		if (wrapper != null) {
-			this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl);
-			this.outputFormat.configure(cfg.getStubParameters());
-		}
-	}
-
-	/**
-	 * Returns the output format. It can also be <pre>null</pre>.
-	 *
-	 * @return output format or <pre>null</pre>
-	 */
-	public OutputFormat<?> getOutputFormat() { return outputFormat; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
new file mode 100644
index 0000000..029d109
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+
+/**
+ * A task vertex that run an initialization on the master, trying to deserialize an output format
+ * and initializing it on master, if necessary.
+ */
+public class OutputFormatVertex extends AbstractJobVertex {
+	
+	private static final long serialVersionUID = 1L;
+	
+	
+	/** Caches the output format associated to this output vertex. */
+	private transient OutputFormat<?> outputFormat;
+
+	/**
+	 * Creates a new task vertex with the specified name.
+	 * 
+	 * @param name The name of the task vertex.
+	 */
+	public OutputFormatVertex(String name) {
+		super(name);
+	}
+	
+	
+	@Override
+	public void initializeOnMaster(ClassLoader loader) throws Exception {
+		if (this.outputFormat == null) {
+			TaskConfig cfg = new TaskConfig(getConfiguration());
+			UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+		
+			if (wrapper == null) {
+				throw new Exception("No output format present in OutputFormatVertex's task configuration.");
+			}
+
+			this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
+			this.outputFormat.configure(cfg.getStubParameters());
+		}
+		
+		if (this.outputFormat instanceof InitializeOnMaster) {
+			((InitializeOnMaster) this.outputFormat).initializeGlobal(getParallelism());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
deleted file mode 100644
index 3699f0e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-import org.apache.flink.core.io.InputSplit;
-
-
-public class SimpleInputVertex extends AbstractJobInputVertex {
-
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public SimpleInputVertex(String name, JobGraph jobGraph) {
-		this(name, null, jobGraph);
-	}
-	
-	public SimpleInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public SimpleInputVertex(JobGraph jobGraph) {
-		this(null, jobGraph);
-	}
-
-	@Override
-	public Class<? extends InputSplit> getInputSplitType() {
-		return null;
-	}
-
-	@Override
-	public InputSplit[] getInputSplits(int minNumSplits) throws Exception {
-		return null;
-	}	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
deleted file mode 100644
index 8709a07..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-/**
- * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
- * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
- * a JobOutputVertex must not have any further output.
- */
-public class SimpleOutputVertex extends AbstractJobOutputVertex {
-
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public SimpleOutputVertex(String name, JobGraph jobGraph) {
-		this(name, null, jobGraph);
-	}
-	
-	public SimpleOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public SimpleOutputVertex(JobGraph jobGraph) {
-		this(null, jobGraph);
-	}	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index d3ad516..aab9c89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -29,7 +29,7 @@ public abstract class AbstractInvokable {
 	/**
 	 * The environment assigned to this invokable.
 	 */
-	private volatile Environment environment = null;
+	private volatile Environment environment;
 
 	/**
 	 * Must be overwritten by the concrete task to instantiate the required record reader and record writer.
@@ -60,7 +60,6 @@ public abstract class AbstractInvokable {
 	 * 
 	 * @return the environment of this task or <code>null</code> if the environment has not yet been set
 	 */
-	// TODO: This method should be final
 	public Environment getEnvironment() {
 		return this.environment;
 	}
@@ -72,7 +71,6 @@ public abstract class AbstractInvokable {
 	 * @return the current number of subtasks the respective task is split into
 	 */
 	public final int getCurrentNumberOfSubtasks() {
-
 		return this.environment.getCurrentNumberOfSubtasks();
 	}
 
@@ -82,7 +80,6 @@ public abstract class AbstractInvokable {
 	 * @return the index of this subtask in the subtask group
 	 */
 	public final int getIndexInSubtaskGroup() {
-
 		return this.environment.getIndexInSubtaskGroup();
 	}
 
@@ -92,7 +89,6 @@ public abstract class AbstractInvokable {
 	 * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}
 	 */
 	public final Configuration getTaskConfiguration() {
-
 		return this.environment.getTaskConfiguration();
 	}
 
@@ -102,40 +98,10 @@ public abstract class AbstractInvokable {
 	 * @return the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}
 	 */
 	public final Configuration getJobConfiguration() {
-
 		return this.environment.getJobConfiguration();
 	}
 
 	/**
-	 * This method should be called by the user code if a custom
-	 * user thread has been started.
-	 * 
-	 * @param userThread
-	 *        the user thread which has been started
-	 */
-	public final void userThreadStarted(Thread userThread) {
-
-		if (this.environment != null) {
-			this.environment.userThreadStarted(userThread);
-		}
-
-	}
-
-	/**
-	 * This method should be called by the user code if a custom
-	 * user thread has finished.
-	 * 
-	 * @param userThread
-	 *        the user thread which has finished
-	 */
-	public final void userThreadFinished(Thread userThread) {
-
-		if (this.environment != null) {
-			this.environment.userThreadFinished(userThread);
-		}
-	}
-
-	/**
 	 * This method is called when a task is canceled either as a result of a user abort or an execution failure. It can
 	 * be overwritten to respond to shut down the user code properly.
 	 * 
@@ -143,7 +109,6 @@ public abstract class AbstractInvokable {
 	 *         thrown if any exception occurs during the execution of the user code
 	 */
 	public void cancel() throws Exception {
-
 		// The default implementation does nothing.
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
index 7aa3374..94e6cab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 import org.apache.flink.core.io.InputSplit;
 
 /**
- * The input split iterator allows an {@link AbstractInputTask} to iterator over all input splits it is supposed to
+ * The input split iterator allows a task to iterate over all input splits it is supposed to
  * consume. Internally, the input split iterator calls an {@link InputSplitProvider} on each <code>next</code> call in
  * order to facilitate lazy split assignment.
  * 
@@ -72,7 +72,6 @@ public class InputSplitIterator<T extends InputSplit> implements Iterator<T> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public T next() {
-
 		T retVal = null;
 
 		if (this.nextInputSplit == null) {
@@ -88,8 +87,6 @@ public class InputSplitIterator<T extends InputSplit> implements Iterator<T> {
 
 	@Override
 	public void remove() {
-
 		throw new RuntimeException("The InputSplitIterator does not implement the remove method");
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
index 22722e7..20a4ab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
@@ -16,23 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.core.io.InputSplit;
 
 /**
- * An input split provider can be successively queried to provide a series of {@link InputSplit} objects an
- * {@link AbstractInputTask} is supposed to consume in the course of its execution.
- * 
+ * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a
+ * task is supposed to consume in the course of its execution.
  */
 public interface InputSplitProvider {
 
 	/**
-	 * Requests the next input split to be consumed by the calling {@link AbstractInputTask}.
+	 * Requests the next input split to be consumed by the calling task.
 	 * 
-	 * @return the next input split to be consumed by the calling {@link AbstractInputTask} or <code>null</code> if the
-	 *         {@link AbstractInputTask} shall not consume any further input splits.
+	 * @return the next input split to be consumed by the calling task or <code>null</code> if the
+	 *         task shall not consume any further input splits.
 	 */
 	InputSplit getNextInputSplit();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java
deleted file mode 100644
index b8d9557..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import java.util.List;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-/**
- * A deployment manager is responsible for deploying a list of {@link ExecutionVertex} objects the given
- * {@link org.apache.flink.runtime.instance.Instance}. It is called by a {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler} implementation whenever at least one
- * {@link ExecutionVertex} has become ready to be executed.
- * 
- */
-public interface DeploymentManager {
-
-	/**
-	 * Deploys the list of vertices on the given {@link org.apache.flink.runtime.instance.Instance}.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the vertices to be deployed belong to
-	 * @param instance
-	 *        the instance on which the vertices shall be deployed
-	 * @param verticesToBeDeployed
-	 *        the list of vertices to be deployed
-	 */
-	void deploy(JobID jobID, Instance instance, List<ExecutionVertex> verticesToBeDeployed);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
index 6800a68..d36659f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager;
 
 import java.util.ArrayList;
@@ -32,27 +31,20 @@ import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
 import org.apache.flink.runtime.event.job.JobEvent;
 import org.apache.flink.runtime.event.job.ManagementEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
-import org.apache.flink.runtime.event.job.VertexAssignmentEvent;
 import org.apache.flink.runtime.event.job.VertexEvent;
 import org.apache.flink.runtime.execution.ExecutionListener;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.executiongraph.InternalJobStatus;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.ManagementGraphFactory;
-import org.apache.flink.runtime.executiongraph.VertexAssignmentListener;
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
 import org.apache.flink.runtime.managementgraph.ManagementGraph;
 import org.apache.flink.runtime.managementgraph.ManagementVertex;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
 import org.apache.flink.runtime.profiling.ProfilingListener;
 import org.apache.flink.runtime.profiling.types.ProfilingEvent;
 
@@ -62,7 +54,6 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent;
  * the event collector removes all intervals which are older than the interval.
  * <p>
  * This class is thread-safe.
- * 
  */
 public final class EventCollector extends TimerTask implements ProfilingListener {
 
@@ -72,85 +63,43 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 * the data provided by the <code>executionStateChanged</code> callback method.
 	 * However, these IDs are needed to create the construct the {@link VertexEvent} and the
 	 * {@link ExecutionStateChangeEvent}.
-	 * 
 	 */
 	private static final class ExecutionListenerWrapper implements ExecutionListener {
 
-		/**
-		 * The event collector to forward the created event to.
-		 */
+		/** The event collector to forward the created event to. */
 		private final EventCollector eventCollector;
 
-		/**
-		 * The vertex this listener belongs to.
-		 */
-		private final ExecutionVertex vertex;
+		private final ExecutionGraph graph;
+		
 
-		/**
-		 * Constructs a new execution listener object.
-		 * 
-		 * @param eventCollector
-		 *        the event collector to forward the created event to
-		 * @param vertex
-		 *        the vertex this listener belongs to.
-		 */
-		public ExecutionListenerWrapper(final EventCollector eventCollector, final ExecutionVertex vertex) {
+		public ExecutionListenerWrapper(EventCollector eventCollector, ExecutionGraph graph) {
 			this.eventCollector = eventCollector;
-			this.vertex = vertex;
+			this.graph = graph;
 		}
 
-		/**
-		 * {@inheritDoc}
-		 */
 		@Override
-		public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
-				final ExecutionState newExecutionState, final String optionalMessage) {
-
+		public void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
+				ExecutionState2 newExecutionState, String optionalMessage)
+		{
 			final long timestamp = System.currentTimeMillis();
 
-			final JobVertexID jobVertexID = this.vertex.getGroupVertex().getJobVertexID();
-			final String taskName = this.vertex.getGroupVertex().getName();
-			final int totalNumberOfSubtasks = this.vertex.getGroupVertex().getCurrentNumberOfGroupMembers();
-			final int indexInSubtaskGroup = this.vertex.getIndexInVertexGroup();
+			final ExecutionJobVertex vertex = graph.getJobVertex(vertexId);
+			
+			final String taskName = vertex == null ? "(null)" : vertex.getJobVertex().getName();
+			final int totalNumberOfSubtasks = vertex == null ? -1 : vertex.getParallelism();
 
 			// Create a new vertex event
-			final VertexEvent vertexEvent = new VertexEvent(timestamp, jobVertexID, taskName, totalNumberOfSubtasks,
-				indexInSubtaskGroup, newExecutionState, optionalMessage);
+			final VertexEvent vertexEvent = new VertexEvent(timestamp, vertexId, taskName, totalNumberOfSubtasks,
+					subtask, newExecutionState, optionalMessage);
 
 			this.eventCollector.addEvent(jobID, vertexEvent);
 
 			final ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp,
-				vertexID.toManagementVertexID(), newExecutionState);
+					vertexId.toManagementVertexId(subtask), newExecutionState);
 
 			this.eventCollector.updateManagementGraph(jobID, executionStateChangeEvent, optionalMessage);
 			this.eventCollector.addEvent(jobID, executionStateChangeEvent);
 		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
-			// Nothing to do here
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
-			// Nothing to do here
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public int getPriority() {
-
-			return 20;
-		}
-
 	}
 
 	/**
@@ -162,24 +111,16 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 */
 	private static final class JobStatusListenerWrapper implements JobStatusListener {
 
-		/**
-		 * The event collector to forward the created event to.
-		 */
+		/** The event collector to forward the created event to. */
 		private final EventCollector eventCollector;
 
-		/**
-		 * The name of the job this wrapper has been created for.
-		 */
+		/** The name of the job this wrapper has been created for. */
 		private final String jobName;
 
-		/**
-		 * <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise.
-		 */
+		/** <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise. */
 		private final boolean isProfilingAvailable;
 
-		/**
-		 * The time stamp of the job submission
-		 */
+		/** The time stamp of the job submission */
 		private final long submissionTimestamp;
 
 		/**
@@ -194,101 +135,32 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 		 * @param submissionTimestamp
 		 *        the submission time stamp of the job
 		 */
-		public JobStatusListenerWrapper(final EventCollector eventCollector, final String jobName,
-				final boolean isProfilingAvailable, final long submissionTimestamp) {
-
+		public JobStatusListenerWrapper(EventCollector eventCollector, String jobName,
+				boolean isProfilingAvailable, long submissionTimestamp)
+		{
 			this.eventCollector = eventCollector;
 			this.jobName = jobName;
 			this.isProfilingAvailable = isProfilingAvailable;
 			this.submissionTimestamp = submissionTimestamp;
 		}
 
-		/**
-		 * {@inheritDoc}
-		 */
 		@Override
-		public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
-				final String optionalMessage) {
+		public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) {
 
 			final JobID jobID = executionGraph.getJobID();
 
-			if (newJobStatus == InternalJobStatus.SCHEDULED) {
+			if (newJobStatus == JobStatus.RUNNING) {
 
 				final ManagementGraph managementGraph = ManagementGraphFactory.fromExecutionGraph(executionGraph);
 				this.eventCollector.addManagementGraph(jobID, managementGraph);
 			}
 
 			// Update recent job event
-			final JobStatus jobStatus = InternalJobStatus.toJobStatus(newJobStatus);
-			if (jobStatus != null) {
-				this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable,
-					this.submissionTimestamp, jobStatus);
-
-				this.eventCollector.addEvent(jobID,
-					new JobEvent(System.currentTimeMillis(), jobStatus, optionalMessage));
-			}
-		}
-	}
-
-	/**
-	 * The vertex assignment listener wrapper is an auxiliary class. It is required
-	 * because the job ID cannot be accessed from the data provided by the <code>vertexAssignmentChanged</code> callback
-	 * method. However, this job ID is needed to prepare the {@link VertexAssignmentEvent} for transmission.
-	 * 
-	 */
-	private static final class VertexAssignmentListenerWrapper implements VertexAssignmentListener {
-
-		/**
-		 * The event collector to forward the created event to.
-		 */
-		private final EventCollector eventCollector;
-
-		/**
-		 * The ID the job this wrapper has been created for.
-		 */
-		private final JobID jobID;
-
-		/**
-		 * Constructs a new vertex assignment listener wrapper.
-		 * 
-		 * @param eventCollector
-		 *        the event collector to forward the events to
-		 * @param jobID
-		 *        the ID of the job
-		 */
-		public VertexAssignmentListenerWrapper(final EventCollector eventCollector, final JobID jobID) {
-			this.eventCollector = eventCollector;
-			this.jobID = jobID;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void vertexAssignmentChanged(final ExecutionVertexID id, final AllocatedResource newAllocatedResource) {
-
-			// Create a new vertex assignment event
-			final ManagementVertexID managementVertexID = id.toManagementVertexID();
-			final long timestamp = System.currentTimeMillis();
-
-			final Instance instance = newAllocatedResource.getInstance();
-			VertexAssignmentEvent event;
-			if (instance == null) {
-				event = new VertexAssignmentEvent(timestamp, managementVertexID, "null");
-			} else {
-
-				String instanceName = null;
-				if (instance.getInstanceConnectionInfo() != null) {
-					instanceName = instance.getInstanceConnectionInfo().toString();
-				} else {
-					instanceName = instance.toString();
-				}
+			this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable,
+					this.submissionTimestamp, newJobStatus);
 
-				event = new VertexAssignmentEvent(timestamp, managementVertexID, instanceName);
-			}
-
-			this.eventCollector.updateManagementGraph(jobID, event);
-			this.eventCollector.addEvent(this.jobID, event);
+			this.eventCollector.addEvent(jobID,
+					new JobEvent(System.currentTimeMillis(), newJobStatus, optionalMessage));
 		}
 	}
 
@@ -344,8 +216,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 *        <code>true</code> if {@link ManagementEvent} objects shall be added to the list as well,
 	 *        <code>false</code> otherwise
 	 */
-	public void getEventsForJob(final JobID jobID, final List<AbstractEvent> eventList,
-			final boolean includeManagementEvents) {
+	public void getEventsForJob(JobID jobID, List<AbstractEvent> eventList, boolean includeManagementEvents) {
 
 		synchronized (this.collectedEvents) {
 
@@ -431,15 +302,15 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 * @param jobStatus
 	 *        the status of the job
 	 */
-	private void updateRecentJobEvent(final JobID jobID, final String jobName, final boolean isProfilingEnabled,
-			final long submissionTimestamp, final JobStatus jobStatus) {
-
+	private void updateRecentJobEvent(JobID jobID, String jobName, boolean isProfilingEnabled,
+			long submissionTimestamp, JobStatus jobStatus)
+	{
 		final long currentTime = System.currentTimeMillis();
+		
 		final RecentJobEvent recentJobEvent = new RecentJobEvent(jobID, jobName, jobStatus, isProfilingEnabled,
 			submissionTimestamp, currentTime);
 
 		synchronized (this.recentJobs) {
-
 			this.recentJobs.put(jobID, recentJobEvent);
 		}
 	}
@@ -448,7 +319,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 * Registers a job in form of its execution graph representation
 	 * with the job progress collector. The collector will subscribe
 	 * to state changes of the individual subtasks. A separate
-	 * deregistration is not necessary since the job progress collector
+	 * de-registration is not necessary since the job progress collector
 	 * periodically discards outdated progress information.
 	 * 
 	 * @param executionGraph
@@ -458,26 +329,12 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 * @param submissionTimestamp
 	 *        the submission time stamp of the job
 	 */
-	public void registerJob(final ExecutionGraph executionGraph, final boolean profilingAvailable,
-			final long submissionTimestamp) {
-
-		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(executionGraph, true);
-
-		while (it.hasNext()) {
+	public void registerJob(ExecutionGraph executionGraph, boolean profilingAvailable, long submissionTimestamp) {
 
-			final ExecutionVertex vertex = it.next();
+		executionGraph.registerExecutionListener(new ExecutionListenerWrapper(this, executionGraph));
 
-			// Register the listener object which will pass state changes on to the collector
-			vertex.registerExecutionListener(new ExecutionListenerWrapper(this, vertex));
-
-			// Register the listener object which will pass assignment changes on to the collector
-			vertex.registerVertexAssignmentListener(new VertexAssignmentListenerWrapper(this, executionGraph.getJobID()));
-		}
-
-		// Register one job status listener wrapper for the entire job
 		executionGraph.registerJobStatusListener(new JobStatusListenerWrapper(this, executionGraph.getJobName(),
 			profilingAvailable, submissionTimestamp));
-
 	}
 
 	/**
@@ -547,7 +404,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 
 	@Override
 	public void processProfilingEvents(final ProfilingEvent profilingEvent) {
-
 		// Simply add profiling events to the job's event queue
 		addEvent(profilingEvent.getJobID(), profilingEvent);
 	}
@@ -561,7 +417,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 *        the management graph to be added
 	 */
 	void addManagementGraph(final JobID jobID, final ManagementGraph managementGraph) {
-
 		synchronized (this.recentManagementGraphs) {
 			this.recentManagementGraphs.put(jobID, managementGraph);
 		}
@@ -576,38 +431,12 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 * @return the management graph for the job with the given ID or <code>null</code> if no such graph exists
 	 */
 	public ManagementGraph getManagementGraph(final JobID jobID) {
-
 		synchronized (this.recentManagementGraphs) {
 			return this.recentManagementGraphs.get(jobID);
 		}
 	}
 
 	/**
-	 * Applies changes in the vertex assignment to the stored management graph.
-	 * 
-	 * @param jobID
-	 *        the ID of the job whose management graph shall be updated
-	 * @param vertexAssignmentEvent
-	 *        the event describing the changes in the vertex assignment
-	 */
-	private void updateManagementGraph(final JobID jobID, final VertexAssignmentEvent vertexAssignmentEvent) {
-
-		synchronized (this.recentManagementGraphs) {
-
-			final ManagementGraph managementGraph = this.recentManagementGraphs.get(jobID);
-			if (managementGraph == null) {
-				return;
-			}
-			final ManagementVertex vertex = managementGraph.getVertexByID(vertexAssignmentEvent.getVertexID());
-			if (vertex == null) {
-				return;
-			}
-
-			vertex.setInstanceName(vertexAssignmentEvent.getInstanceName());
-		}
-	}
-
-	/**
 	 * Applies changes in the state of an execution vertex to the stored management graph.
 	 * 
 	 * @param jobID
@@ -615,7 +444,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 	 * @param executionStateChangeEvent
 	 *        the event describing the changes in the execution state of the vertex
 	 */
-	private void updateManagementGraph(final JobID jobID, final ExecutionStateChangeEvent executionStateChangeEvent, String optionalMessage) {
+	private void updateManagementGraph(JobID jobID, ExecutionStateChangeEvent executionStateChangeEvent, String optionalMessage) {
 
 		synchronized (this.recentManagementGraphs) {
 
@@ -629,7 +458,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 			}
 
 			vertex.setExecutionState(executionStateChangeEvent.getNewExecutionState());
-			if (executionStateChangeEvent.getNewExecutionState() == ExecutionState.FAILED) {
+			if (optionalMessage != null) {
 				vertex.setOptMessage(optionalMessage);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 3b76b78..fc76d73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager;
 
 import java.io.File;
@@ -25,11 +24,10 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -48,61 +46,49 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.io.StringRecord;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.runtime.ExecutionMode;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorEvent;
 import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode;
 import org.apache.flink.runtime.client.JobCancelResult;
 import org.apache.flink.runtime.client.JobProgressResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
-import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.executiongraph.GraphConversionException;
-import org.apache.flink.runtime.executiongraph.InternalJobStatus;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.instance.DefaultInstanceManager;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.InstanceManager;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
 import org.apache.flink.runtime.ipc.RPC;
 import org.apache.flink.runtime.ipc.Server;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
 import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
 import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
 import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitWrapper;
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
 import org.apache.flink.runtime.managementgraph.ManagementGraph;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
-import org.apache.flink.runtime.profiling.JobManagerProfiler;
-import org.apache.flink.runtime.profiling.ProfilingUtils;
 import org.apache.flink.runtime.protocols.AccumulatorProtocol;
 import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
 import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
 import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
 import org.apache.flink.runtime.protocols.JobManagerProtocol;
-import org.apache.flink.runtime.taskmanager.AbstractTaskResult;
-import org.apache.flink.runtime.taskmanager.TaskCancelResult;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskKillResult;
-import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -113,15 +99,13 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
+import com.google.common.base.Preconditions;
+
 /**
- * In Nephele the job manager is the central component for communication with clients, creating
- * schedules for incoming jobs and supervise their execution. A job manager may only exist once in
- * the system and its address must be known the clients.
- * Task managers can discover the job manager by means of an UDP broadcast and afterwards advertise
- * themselves as new workers for tasks.
- * 
+ * The JobManager is the master that coordinates the distributed execution.
+ * It receives jobs from clients, tracks the distributed execution.
  */
-public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol,
+public class JobManager implements ExtendedManagementProtocol, InputSplitProviderProtocol,
 		JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
 {
 
@@ -130,32 +114,46 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	private final static int FAILURE_RETURN_CODE = 1;
 	
 	
+	/** Executor service for asynchronous commands (to relieve the RPC threads of work) */
 	private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
 	
-	private final Server jobManagerServer;
 
-	private final EventCollector eventCollector;
-	
-	private final ArchiveListener archive;
+	/** The RPC end point through which the JobManager gets its calls */
+	private final Server jobManagerServer;
 
+	/** Keeps track of the currently available task managers */
 	private final InstanceManager instanceManager;
 	
+	/** Assigns tasks to slots and keeps track on available and allocated task slots*/
 	private final DefaultScheduler scheduler;
 	
+	/** The currently running jobs */
+	private final ConcurrentHashMap<JobID, ExecutionGraph> currentJobs;
+	
+	
+	// begin: these will be consolidated / removed 
+	private final EventCollector eventCollector;
+	
+	private final ArchiveListener archive;
+	
 	private final AccumulatorManager accumulatorManager;
-
 	
 	private final int recommendedClientPollingInterval;
-
+	// end: these will be consolidated / removed
+	
 	
 	private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
-
+	
 	private volatile boolean isShutDown;
 	
 	
 	private WebInfoServer server;
 	
 	
+	// --------------------------------------------------------------------------------------------
+	//  Initialization & Shutdown
+	// --------------------------------------------------------------------------------------------
+	
 	public JobManager(ExecutionMode executionMode) throws Exception {
 
 		final String ipcAddressString = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
@@ -190,6 +188,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			this.archive = null;
 		}
 		
+		this.currentJobs = new ConcurrentHashMap<JobID, ExecutionGraph>();
+		
 		// Create the accumulator manager, with same archiving limit as web
 		// interface. We need to store the accumulators for at least one job.
 		// Otherwise they might be deleted before the client requested the
@@ -218,21 +218,15 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			this.instanceManager = new LocalInstanceManager(numTaskManagers);
 		}
 		else if (executionMode == ExecutionMode.CLUSTER) {
-			this.instanceManager = new DefaultInstanceManager();
+			this.instanceManager = new InstanceManager();
 		}
 		else {
 			throw new IllegalArgumentException("ExecutionMode");
 		}
 
-		// Try to load the scheduler for the given execution mode
-		final String schedulerClassName = JobManagerUtils.getSchedulerClassName(executionMode);
-		LOG.info("Trying to load " + schedulerClassName + " as scheduler");
-
-		// Try to get the instance manager class name
-		this.scheduler = JobManagerUtils.loadScheduler(schedulerClassName, this, this.instanceManager);
-		if (this.scheduler == null) {
-			throw new Exception("Unable to load scheduler " + schedulerClassName);
-		}
+		// create the scheduler and make it listen at the availability of new instances
+		this.scheduler = new DefaultScheduler();
+		this.instanceManager.addInstanceListener(this.scheduler);
 	}
 
 	public void shutdown() {
@@ -275,393 +269,223 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		LOG.debug("Shutdown of job manager completed");
 	}
 	
-	/**
-	 * Entry point for the program
-	 * 
-	 * @param args
-	 *        arguments from the command line
-	 */
-	
-	public static void main(String[] args) {
-		// determine if a valid log4j config exists and initialize a default logger if not
-		if (System.getProperty("log4j.configuration") == null) {
-			Logger root = Logger.getRootLogger();
-			root.removeAllAppenders();
-			PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
-			ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
-			root.addAppender(appender);
-			root.setLevel(Level.INFO);
-		}
-		
-		JobManager jobManager;
-		try {
-			jobManager = initialize(args);
-			// Start info server for jobmanager
-			jobManager.startInfoServer();
-		}
-		catch (Exception e) {
-			LOG.fatal(e.getMessage(), e);
-			System.exit(FAILURE_RETURN_CODE);
-		}
-		
-		// Clean up is triggered through a shutdown hook
-		// freeze this thread to keep the JVM alive (the job manager threads are daemon threads)
-		Object w = new Object();
-		synchronized (w) {
-			try {
-				w.wait();
-			} catch (InterruptedException e) {}
-		}
-	}
-	
-	@SuppressWarnings("static-access")
-	public static JobManager initialize(String[] args) throws Exception {
-		final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg()
-			.withDescription("Specify configuration directory.").create("configDir");
-
-		final Option executionModeOpt = OptionBuilder.withArgName("execution mode").hasArg()
-			.withDescription("Specify execution mode.").create("executionMode");
-
-		final Options options = new Options();
-		options.addOption(configDirOpt);
-		options.addOption(executionModeOpt);
-
-		CommandLineParser parser = new GnuParser();
-		CommandLine line = null;
-		try {
-			line = parser.parse(options, args);
-		} catch (ParseException e) {
-			LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
-			System.exit(FAILURE_RETURN_CODE);
-		}
-
-		final String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
-		final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local");
-		
-		ExecutionMode executionMode = null;
-		if ("local".equals(executionModeName)) {
-			executionMode = ExecutionMode.LOCAL;
-		} else if ("cluster".equals(executionModeName)) {
-			executionMode = ExecutionMode.CLUSTER;
-		} else {
-			System.err.println("Unrecognized execution mode: " + executionModeName);
-			System.exit(FAILURE_RETURN_CODE);
-		}
-		
-		// print some startup environment info, like user, code revision, etc
-		EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
-		
-		// First, try to load global configuration
-		GlobalConfiguration.loadConfiguration(configDir);
-
-		// Create a new job manager object
-		JobManager jobManager = new JobManager(executionMode);
-		
-		// Set base dir for info server
-		Configuration infoserverConfig = GlobalConfiguration.getConfiguration();
-		if (configDir != null && new File(configDir).isDirectory()) {
-			infoserverConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir+"/..");
-		}
-		GlobalConfiguration.includeConfiguration(infoserverConfig);
-		return jobManager;
-	}
-
+	// --------------------------------------------------------------------------------------------
+	//  Job Execution
+	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public JobSubmissionResult submitJob(JobGraph job) throws IOException {
+		
+		boolean success = false;
+		
 		try {
 			// First check if job is null
 			if (job == null) {
 				return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!");
 			}
 	
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Submitted job " + job.getName() + " is not null");
-			}
-	
-			// Check if any vertex of the graph has null edges
-			AbstractJobVertex jv = job.findVertexWithNullEdges();
-			if (jv != null) {
-				JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Vertex "
-					+ jv.getName() + " has at least one null edge");
-				return result;
+			if (LOG.isInfoEnabled()) {
+				LOG.info(String.format("Received job %s (%s)", job.getJobID(), job.getName()));
 			}
-	
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Submitted job " + job.getName() + " has no null edges");
+			
+			// get the existing execution graph (if we attach), or construct a new empty one to attach
+			ExecutionGraph executionGraph = this.currentJobs.get(job.getJobID());
+			if (executionGraph == null) {
+				if (LOG.isInfoEnabled()) {
+					LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')');
+				}
+				
+				executionGraph = new ExecutionGraph(job.getJobID(), job.getName(), job.getJobConfiguration(), this.executorService);
+				ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
+				if (previous != null) {
+					throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID());
+				}
 			}
-	
-			// Next, check if the graph is weakly connected
-			if (!job.isWeaklyConnected()) {
-				JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
-					"Job graph is not weakly connected");
-				return result;
+			else {
+				if (LOG.isInfoEnabled()) {
+					LOG.info(String.format("Found existing execution graph for id %s, attaching this job.", job.getJobID()));
+				}
 			}
-	
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("The graph of job " + job.getName() + " is weakly connected");
+			
+			// grab the class loader for user-defined code
+			final ClassLoader userCodeLoader = LibraryCacheManager.getClassLoader(job.getJobID());
+			if (userCodeLoader == null) {
+				throw new JobException("The user code class loader could not be initialized.");
 			}
-	
-			// Check if job graph has cycles
-			if (!job.isAcyclic()) {
-				JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
-					"Job graph is not a DAG");
-				return result;
+			
+			String[] jarFilesForJob = LibraryCacheManager.getRequiredJarFiles(job.getJobID());
+			for (String fileId : jarFilesForJob) {
+				executionGraph.addUserCodeJarFile(fileId);
 			}
-	
+			
+			// first, perform the master initialization of the nodes
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("The graph of job " + job.getName() + " is acyclic");
-			}
-	
-			// Check constrains on degree
-			jv = job.areVertexDegreesCorrect();
-			if (jv != null) {
-				JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
-					"Degree of vertex " + jv.getName() + " is incorrect");
-				return result;
+				LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName()));
 			}
-	
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("All vertices of job " + job.getName() + " have the correct degree");
+			try {
+				for (AbstractJobVertex vertex : job.getVertices()) {
+					// check that the vertex has an executable class
+					String executableClass = vertex.getInvokableClassName();
+					if (executableClass == null || executableClass.length() == 0) {
+						throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName()));
+					}
+					
+					// master side initialization
+					vertex.initializeOnMaster(userCodeLoader);
+				}
 			}
-	
-			if (!job.isInstanceDependencyChainAcyclic()) {
-				JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
-					"The dependency chain for instance sharing contains a cycle");
-	
-				return result;
+			catch (FileNotFoundException e) {
+				LOG.error("File-not-Found: " + e.getMessage());
+				return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, e.getMessage());
 			}
-	
+			
+			// first topologically sort the job vertices to form the basis of creating the execution graph
+			List<AbstractJobVertex> topoSorted = job.getVerticesSortedTopologicallyFromSources();
+			
+			// first convert this job graph to an execution graph
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("The dependency chain for instance sharing is acyclic");
+				LOG.debug(String.format("Adding %d vertices from job graph %s (%s)", topoSorted.size(), job.getJobID(), job.getName()));
 			}
-	
-			// Try to create initial execution graph from job graph
-			LOG.info("Creating initial execution graph from job graph " + job.getName());
-			ExecutionGraph eg;
-	
-			try {
-				eg = new ExecutionGraph(job, 1);
-			} catch (GraphConversionException e) {
-				if (e.getCause() == null) {
-					return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));
-				} else {
-					Throwable t = e.getCause();
-					if (t instanceof FileNotFoundException) {
-						return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, t.getMessage());
-					} else {
-						return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
-					}
-				}
+			
+			executionGraph.attachJobGraph(topoSorted);
+			
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(String.format("Successfully created execution graph from job graph %s (%s)", job.getJobID(), job.getName()));
 			}
 	
 			// Register job with the progress collector
 			if (this.eventCollector != null) {
-				this.eventCollector.registerJob(eg, false, System.currentTimeMillis());
+				this.eventCollector.registerJob(executionGraph, false, System.currentTimeMillis());
 			}
 	
 			// Register for updates on the job status
-			eg.registerJobStatusListener(this);
+			executionGraph.registerJobStatusListener(this);
 	
 			// Schedule job
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Scheduling job " + job.getName());
 			}
 	
-			try {
-				this.scheduler.scheduleJob(eg);
-			} catch (SchedulingException e) {
-				unregisterJob(eg);
-				JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));
-				return result;
-			}
+			executionGraph.scheduleForExecution(this.scheduler);
 	
 			// Return on success
+			success = true;
 			return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null);
 		}
 		catch (Throwable t) {
 			LOG.error("Job submission failed.", t);
 			return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
 		}
-	}
-	
-
-	public InstanceManager getInstanceManager() {
-		return this.instanceManager;
-	}
-
-	/**
-	 * This method is a convenience method to unregister a job from all of
-	 * Nephele's monitoring, profiling and optimization components at once.
-	 * Currently, it is only being used to unregister from profiling (if activated).
-	 * 
-	 * @param executionGraph
-	 *        the execution graph to remove from the job manager
-	 */
-	private void unregisterJob(final ExecutionGraph executionGraph) {
-
-		// Remove job from profiler (if activated)
-		if (this.profiler != null
-			&& executionGraph.getJobConfiguration().getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
-			this.profiler.unregisterProfilingJob(executionGraph);
-
-			if (this.eventCollector != null) {
-				this.profiler.unregisterFromProfilingData(executionGraph.getJobID(), this.eventCollector);
-			}
-		}
-
-		// Remove job from input split manager
-		if (this.inputSplitManager != null) {
-			this.inputSplitManager.unregisterJob(executionGraph);
-		}
-
-		// Unregister job with library cache manager
-		try {
-			LibraryCacheManager.unregister(executionGraph.getJobID());
-		} catch (IOException ioe) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn(ioe);
+		finally {
+			if (!success) {
+				this.currentJobs.remove(job.getJobID());
+				
+				try {
+					LibraryCacheManager.unregister(job.getJobID());
+				}
+				catch (IllegalStateException e) {
+					// may happen if the job failed before being registered at the
+					// library cache manager
+				}
+				catch (Throwable t) {
+					LOG.error("Error while de-registering job at library cache manager.", t);
+				}
 			}
 		}
 	}
 
-
 	@Override
-	public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo) {
-
-		// Delegate call to instance manager
-		if (this.instanceManager != null) {
-
-			final Runnable heartBeatRunnable = new Runnable() {
+	public JobCancelResult cancelJob(JobID jobID) throws IOException {
 
-				@Override
-				public void run() {
-					instanceManager.reportHeartBeat(instanceConnectionInfo);
-				}
-			};
+		LOG.info("Trying to cancel job with ID " + jobID);
 
-			this.executorService.execute(heartBeatRunnable);
+		final ExecutionGraph eg = this.currentJobs.get(jobID);
+		if (eg == null) {
+			LOG.info("No job found with ID " + jobID);
+			return new JobCancelResult(ReturnCode.ERROR, "Cannot find job with ID " + jobID);
 		}
-	}
 
-	@Override
-	public RegisterTaskManagerResult registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo,
-									final HardwareDescription hardwareDescription, final IntegerRecord numberOfSlots){
-		if(this.instanceManager != null) {
-			final Runnable registerTaskManagerRunnable = new Runnable() {
-				@Override
-				public void run(){
-					instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription,
-							numberOfSlots.getValue());
-				}
-			};
+		final Runnable cancelJobRunnable = new Runnable() {
+			@Override
+			public void run() {
+				eg.cancel();
+			}
+		};
 
-			this.executorService.execute(registerTaskManagerRunnable);
-			return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS);
-		}
+		eg.execute(cancelJobRunnable);
 
-		return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.FAILURE);
+		return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null);
 	}
-
-
+	
 	@Override
-	public void updateTaskExecutionState(final TaskExecutionState executionState) throws IOException {
-
-		// Ignore calls with executionResult == null
-		if (executionState == null) {
-			LOG.error("Received call to updateTaskExecutionState with executionState == null");
-			return;
-		}
+	public void updateTaskExecutionState(TaskExecutionState executionState) throws IOException {
+		Preconditions.checkNotNull(executionState);
 
-		if (executionState.getExecutionState() == ExecutionState.FAILED) {
-			LOG.error(executionState.getDescription());
-		}
 
-		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(executionState.getJobID());
+		final ExecutionGraph eg = this.currentJobs.get(executionState.getJobID());
 		if (eg == null) {
 			LOG.error("Cannot find execution graph for ID " + executionState.getJobID() + " to change state to "
 				+ executionState.getExecutionState());
 			return;
 		}
 
-		final ExecutionVertex vertex = eg.getVertexByID(executionState.getID());
-		if (vertex == null) {
-			LOG.error("Cannot find vertex with ID " + executionState.getID() + " of job " + eg.getJobID()
-				+ " to change state to " + executionState.getExecutionState());
-			return;
-		}
-
-		// Asynchronously update execute state of vertex
-		vertex.updateExecutionStateAsynchronously(executionState.getExecutionState(), executionState.getDescription());
+		eg.updateState(executionState);
 	}
-
-
+	
 	@Override
-	public JobCancelResult cancelJob(final JobID jobID) throws IOException {
+	public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId) throws IOException {
 
-		LOG.info("Trying to cancel job with ID " + jobID);
-
-		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
-		if (eg == null) {
-			return new JobCancelResult(ReturnCode.ERROR, "Cannot find job with ID " + jobID);
+		final ExecutionGraph graph = this.currentJobs.get(jobID);
+		if (graph == null) {
+			LOG.error("Cannot find execution graph to job ID " + jobID);
+			return null;
 		}
 
-		final Runnable cancelJobRunnable = new Runnable() {
-
-			@Override
-			public void run() {
-				eg.updateJobStatus(InternalJobStatus.CANCELING, "Job canceled by user");
-				final TaskCancelResult cancelResult = cancelJob(eg);
-				if (cancelResult != null) {
-					LOG.error(cancelResult.getDescription());
-				}
-			}
-		};
-
-		eg.executeCommand(cancelJobRunnable);
-
-		LOG.info("Cancel of job " + jobID + " successfully triggered");
+		final ExecutionJobVertex vertex = graph.getJobVertex(vertexId);
+		if (vertex == null) {
+			LOG.error("Cannot find execution vertex for vertex ID " + vertexId);
+			return null;
+		}
 
-		return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null);
+		InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
+		if (splitAssigner == null) {
+			LOG.error("No InputSplitAssigner for vertex ID " + vertexId);
+			return null;
+		}
+		
+		
+		return splitAssigner.getNextInputSplit(null);
 	}
+	
+	@Override
+	public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) {
 
-	/**
-	 * Cancels all the tasks in the current and upper stages of the
-	 * given execution graph.
-	 * 
-	 * @param eg
-	 *        the execution graph representing the job to cancel.
-	 * @return <code>null</code> if no error occurred during the cancel attempt,
-	 *         otherwise the returned object will describe the error
-	 */
-	private TaskCancelResult cancelJob(final ExecutionGraph eg) {
-
-		TaskCancelResult errorResult = null;
-
-		/**
-		 * Cancel all nodes in the current and upper execution stages.
-		 */
-		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, eg.getIndexOfCurrentExecutionStage(),
-			false, true);
-		while (it.hasNext()) {
+		final JobID jid = executionGraph.getJobID();
+		
+		if (LOG.isInfoEnabled()) {
+			String message = optionalMessage == null ? "." : ": " + optionalMessage;
+			LOG.info(String.format("Status of job %s (%s) changed to %s%s", 
+					jid, executionGraph.getJobName(), newJobStatus, message));
+		}
 
-			final ExecutionVertex vertex = it.next();
-			final TaskCancelResult result = vertex.cancelTask();
-			if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
-				errorResult = result;
+		// remove the job graph if the state is any terminal state
+		if (newJobStatus == JobStatus.FINISHED || newJobStatus == JobStatus.CANCELED || newJobStatus == JobStatus.FAILED) {
+			this.currentJobs.remove(jid);
+			
+			try {
+				LibraryCacheManager.unregister(jid);
+			}
+			catch (Throwable t) {
+				LOG.warn("Could not properly unregister job " + jid + " from the library cache.");
 			}
 		}
-
-		return errorResult;
 	}
 
-
 	@Override
 	public JobProgressResult getJobProgress(final JobID jobID) throws IOException {
 
 		if (this.eventCollector == null) {
-			return new JobProgressResult(ReturnCode.ERROR, "JobManager does not support progress reports for jobs",
-				null);
+			return new JobProgressResult(ReturnCode.ERROR, "JobManager does not support progress reports for jobs", null);
 		}
 
 		final SerializableArrayList<AbstractEvent> eventList = new SerializableArrayList<AbstractEvent>();
@@ -674,104 +498,32 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	@Override
 	public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
 
-		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
+		final ExecutionGraph eg = this.currentJobs.get(jobID);
 		if (eg == null) {
 			LOG.error("Cannot find execution graph to job ID " + jobID);
 			return ConnectionInfoLookupResponse.createReceiverNotFound();
 		}
 
-		final InternalJobStatus jobStatus = eg.getJobStatus();
-		if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.CANCELING) {
-			return ConnectionInfoLookupResponse.createJobIsAborting();
-		}
-
-		final ExecutionEdge edge = eg.getEdgeByID(sourceChannelID);
-		if (edge == null) {
-			LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
-			return ConnectionInfoLookupResponse.createReceiverNotFound();
-		}
-
-		if (sourceChannelID.equals(edge.getInputChannelID())) {
-			// Request was sent from an input channel
+		return eg.lookupConnectionInfoAndDeployReceivers(caller, sourceChannelID);
+	}
 
-			final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
-
-			final Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
-			if (assignedInstance == null) {
-				LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex()
-					+ " but no instance assigned");
-				// LOG.info("Created receiverNotReady for " + connectedVertex + " 1");
-				return ConnectionInfoLookupResponse.createReceiverNotReady();
-			}
-
-			// Check execution state
-			final ExecutionState executionState = connectedVertex.getExecutionState();
-			if (executionState == ExecutionState.FINISHED) {
-				// that should not happen. if there is data pending, the receiver cannot be ready
-				return ConnectionInfoLookupResponse.createReceiverNotFound();
-			}
-
-			// running is common, finishing is happens when the lookup is for the close event
-			if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
-				// LOG.info("Created receiverNotReady for " + connectedVertex + " in state " + executionState + " 2");
-				return ConnectionInfoLookupResponse.createReceiverNotReady();
-			}
-
-			if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
-				// Receiver runs on the same task manager
-				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
-			} else {
-				// Receiver runs on a different task manager
-
-				final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
-				final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-
-				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
-			}
-		}
-		// else, the request is for an output channel
-		// Find vertex of connected input channel
-		final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
-
-		// Check execution state
-		final ExecutionState executionState = targetVertex.getExecutionState();
-
-		// check whether the task needs to be deployed
-		if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {
-
-			if (executionState == ExecutionState.ASSIGNED) {
-				final Runnable command = new Runnable() {
-					@Override
-					public void run() {
-						scheduler.deployAssignedVertices(targetVertex);
-					}
-				};
-				eg.executeCommand(command);
-			}
-
-			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
-			return ConnectionInfoLookupResponse.createReceiverNotReady();
-		}
-
-		final Instance assignedInstance = targetVertex.getAllocatedResource().getInstance();
-		if (assignedInstance == null) {
-			LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
-			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
-			return ConnectionInfoLookupResponse.createReceiverNotReady();
-		}
-
-		if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
-			// Receiver runs on the same task manager
-			return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
-		} else {
-			// Receiver runs on a different task manager
-			final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
-			final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-
-			return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
-		}
+	// --------------------------------------------------------------------------------------------
+	//  Properties
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Tests whether the job manager has been shut down completely.
+	 * 
+	 * @return <code>true</code> if the job manager has been shut down completely, <code>false</code> otherwise
+	 */
+	public boolean isShutDown() {
+		return this.isShutDown;
 	}
-
+	
+	public InstanceManager getInstanceManager() {
+		return this.instanceManager;
+	}
+	
 	/**
 	 * Returns current ManagementGraph from eventCollector and, if not current, from archive
 	 * 
@@ -828,239 +580,10 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		return eventList;
 	}
 
-
-	@Override
-	public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException {
-
-		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
-		if (eg == null) {
-			LOG.error("Cannot find execution graph for job " + jobID);
-			return;
-		}
-
-		final ExecutionVertex vertex = eg.getVertexByID(ExecutionVertexID.fromManagementVertexID(id));
-		if (vertex == null) {
-			LOG.error("Cannot find execution vertex with ID " + id);
-			return;
-		}
-
-		LOG.info("Killing task " + vertex + " of job " + jobID);
-
-		final Runnable runnable = new Runnable() {
-
-			@Override
-			public void run() {
-
-				final TaskKillResult result = vertex.killTask();
-				if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
-					LOG.error(result.getDescription());
-				}
-			}
-		};
-
-		eg.executeCommand(runnable);
-	}
-
-	/**
-	 * Tests whether the job manager has been shut down completely.
-	 * 
-	 * @return <code>true</code> if the job manager has been shut down completely, <code>false</code> otherwise
-	 */
-	public boolean isShutDown() {
-		return this.isShutDown;
-	}
-
-
-
-	@Override
-	public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
-			final String optionalMessage) {
-
-		LOG.info("Status of job " + executionGraph.getJobName() + "(" + executionGraph.getJobID() + ")"
-			+ " changed to " + newJobStatus);
-
-		if (newJobStatus == InternalJobStatus.FAILING) {
-
-			// Cancel all remaining tasks
-			cancelJob(executionGraph);
-		}
-
-		if (newJobStatus == InternalJobStatus.CANCELED || newJobStatus == InternalJobStatus.FAILED
-			|| newJobStatus == InternalJobStatus.FINISHED) {
-			// Unregister job for Nephele's monitoring, optimization components, and dynamic input split assignment
-			unregisterJob(executionGraph);
-		}
-	}
-
-
-	@Override
-	public void logBufferUtilization(final JobID jobID) throws IOException {
-
-		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
-		if (eg == null) {
-			return;
-		}
-
-		final Set<Instance> allocatedInstance = new HashSet<Instance>();
-
-		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, true);
-		while (it.hasNext()) {
-
-			final ExecutionVertex vertex = it.next();
-			final ExecutionState state = vertex.getExecutionState();
-			if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHING) {
-				final Instance instance = vertex.getAllocatedResource().getInstance();
-
-				if (instance instanceof DummyInstance) {
-					LOG.error("Found instance of type DummyInstance for vertex " + vertex.getName() + " (state "
-						+ state + ")");
-					continue;
-				}
-
-				allocatedInstance.add(instance);
-			}
-		}
-
-		// Send requests to task managers from separate thread
-		final Runnable requestRunnable = new Runnable() {
-
-			@Override
-			public void run() {
-
-				final Iterator<Instance> it2 = allocatedInstance.iterator();
-
-				try {
-					while (it2.hasNext()) {
-						it2.next().logBufferUtilization();
-					}
-				} catch (IOException ioe) {
-					LOG.error(ioe);
-				}
-
-			}
-		};
-
-		// Hand over to the executor service
-		this.executorService.execute(requestRunnable);
-	}
-
 	@Override
 	public int getAvailableSlots() {
 		return getInstanceManager().getTotalNumberOfSlots();
 	}
-
-
-	@Override
-	public void deploy(final JobID jobID, final Instance instance,
-			final List<ExecutionVertex> verticesToBeDeployed) {
-
-		if (verticesToBeDeployed.isEmpty()) {
-			LOG.error("Method 'deploy' called but list of vertices to be deployed is empty");
-			return;
-		}
-
-		for (final ExecutionVertex vertex : verticesToBeDeployed) {
-
-			// Check vertex state
-			if (vertex.getExecutionState() != ExecutionState.READY) {
-				LOG.error("Expected vertex " + vertex + " to be in state READY but it is in state "
-					+ vertex.getExecutionState());
-			}
-
-			vertex.updateExecutionState(ExecutionState.STARTING, null);
-		}
-
-		// Create a new runnable and pass it the executor service
-		final Runnable deploymentRunnable = new Runnable() {
-
-			/**
-			 * {@inheritDoc}
-			 */
-			@Override
-			public void run() {
-
-				// Check if all required libraries are available on the instance
-				try {
-					instance.checkLibraryAvailability(jobID);
-				} catch (IOException ioe) {
-					LOG.error("Cannot check library availability: " + StringUtils.stringifyException(ioe));
-				}
-
-				final List<TaskDeploymentDescriptor> submissionList = new SerializableArrayList<TaskDeploymentDescriptor>();
-
-				// Check the consistency of the call
-				for (final ExecutionVertex vertex : verticesToBeDeployed) {
-
-					submissionList.add(vertex.constructDeploymentDescriptor());
-
-					LOG.info("Starting task " + vertex + " on " + vertex.getAllocatedResource().getInstance());
-				}
-
-				List<TaskSubmissionResult> submissionResultList = null;
-
-				try {
-					submissionResultList = instance.submitTasks(submissionList);
-				} catch (final IOException ioe) {
-					final String errorMsg = StringUtils.stringifyException(ioe);
-					for (final ExecutionVertex vertex : verticesToBeDeployed) {
-						vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, errorMsg);
-					}
-				}
-
-				if (verticesToBeDeployed.size() != submissionResultList.size()) {
-					LOG.error("size of submission result list does not match size of list with vertices to be deployed");
-				}
-
-				int count = 0;
-				for (final TaskSubmissionResult tsr : submissionResultList) {
-
-					ExecutionVertex vertex = verticesToBeDeployed.get(count++);
-					if (!vertex.getID().equals(tsr.getVertexID())) {
-						LOG.error("Expected different order of objects in task result list");
-						vertex = null;
-						for (final ExecutionVertex candVertex : verticesToBeDeployed) {
-							if (tsr.getVertexID().equals(candVertex.getID())) {
-								vertex = candVertex;
-								break;
-							}
-						}
-
-						if (vertex == null) {
-							LOG.error("Cannot find execution vertex for vertex ID " + tsr.getVertexID());
-							continue;
-						}
-					}
-
-					if (tsr.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
-						// Change the execution state to failed and let the scheduler deal with the rest
-						vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, tsr.getDescription());
-					}
-				}
-			}
-		};
-
-		this.executorService.execute(deploymentRunnable);
-	}
-
-
-	@Override
-	public InputSplitWrapper requestNextInputSplit(final JobID jobID, final ExecutionVertexID vertexID,
-			final IntegerRecord sequenceNumber) throws IOException {
-
-		final ExecutionGraph graph = this.scheduler.getExecutionGraphByID(jobID);
-		if (graph == null) {
-			LOG.error("Cannot find execution graph to job ID " + jobID);
-			return null;
-		}
-
-		final ExecutionVertex vertex = graph.getVertexByID(vertexID);
-		if (vertex == null) {
-			LOG.error("Cannot find execution vertex for vertex ID " + vertexID);
-			return null;
-		}
-
-		return new InputSplitWrapper(jobID, this.inputSplitManager.getNextInputSplit(vertex, sequenceNumber.getValue()));
-	}
 	
 	/**
 	 * Starts the Jetty Infoserver for the Jobmanager
@@ -1081,17 +604,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	}
 	
 	
-	// TODO Add to RPC?
 	public List<RecentJobEvent> getOldJobs() throws IOException {
-
-		//final List<RecentJobEvent> eventList = new SerializableArrayList<RecentJobEvent>();
-
 		if (this.archive == null) {
 			throw new IOException("No instance of the event collector found");
 		}
 
-		//this.eventCollector.getRecentJobs(eventList);
-
 		return this.archive.getJobs();
 	}
 	
@@ -1103,8 +620,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		return this.instanceManager.getNumberOfRegisteredTaskManagers();
 	}
 	
-	public Map<InstanceConnectionInfo, Instance> getInstances() {
-		return this.instanceManager.getInstances();
+	public Map<InstanceID, Instance> getInstances() {
+		return this.instanceManager.getAllRegisteredInstances();
 	}
 
 	@Override
@@ -1120,4 +637,118 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	public Map<String, Accumulator<?, ?>> getAccumulators(JobID jobID) {
 		return this.accumulatorManager.getJobAccumulators(jobID);
 	}
+	
+	public Map<JobID, ExecutionGraph> getCurrentJobs() {
+		return Collections.unmodifiableMap(currentJobs);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  TaskManager to JobManager communication
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean sendHeartbeat(InstanceID taskManagerId) {
+		return this.instanceManager.reportHeartBeat(taskManagerId);
+	}
+
+	@Override
+	public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
+		return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots);
+	}
+	
+	
+	// --------------------------------------------------------------------------------------------
+	//  Executable
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Entry point for the program
+	 * 
+	 * @param args
+	 *        arguments from the command line
+	 */
+	
+	public static void main(String[] args) {
+		// determine if a valid log4j config exists and initialize a default logger if not
+		if (System.getProperty("log4j.configuration") == null) {
+			Logger root = Logger.getRootLogger();
+			root.removeAllAppenders();
+			PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
+			ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
+			root.addAppender(appender);
+			root.setLevel(Level.INFO);
+		}
+		
+		JobManager jobManager;
+		try {
+			jobManager = initialize(args);
+			// Start info server for jobmanager
+			jobManager.startInfoServer();
+		}
+		catch (Exception e) {
+			LOG.fatal(e.getMessage(), e);
+			System.exit(FAILURE_RETURN_CODE);
+		}
+		
+		// Clean up is triggered through a shutdown hook
+		// freeze this thread to keep the JVM alive (the job manager threads are daemon threads)
+		Object w = new Object();
+		synchronized (w) {
+			try {
+				w.wait();
+			} catch (InterruptedException e) {}
+		}
+	}
+	
+	@SuppressWarnings("static-access")
+	public static JobManager initialize(String[] args) throws Exception {
+		final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg()
+			.withDescription("Specify configuration directory.").create("configDir");
+
+		final Option executionModeOpt = OptionBuilder.withArgName("execution mode").hasArg()
+			.withDescription("Specify execution mode.").create("executionMode");
+
+		final Options options = new Options();
+		options.addOption(configDirOpt);
+		options.addOption(executionModeOpt);
+
+		CommandLineParser parser = new GnuParser();
+		CommandLine line = null;
+		try {
+			line = parser.parse(options, args);
+		} catch (ParseException e) {
+			LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
+			System.exit(FAILURE_RETURN_CODE);
+		}
+
+		final String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
+		final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local");
+		
+		ExecutionMode executionMode = null;
+		if ("local".equals(executionModeName)) {
+			executionMode = ExecutionMode.LOCAL;
+		} else if ("cluster".equals(executionModeName)) {
+			executionMode = ExecutionMode.CLUSTER;
+		} else {
+			System.err.println("Unrecognized execution mode: " + executionModeName);
+			System.exit(FAILURE_RETURN_CODE);
+		}
+		
+		// print some startup environment info, like user, code revision, etc
+		EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
+		
+		// First, try to load global configuration
+		GlobalConfiguration.loadConfiguration(configDir);
+
+		// Create a new job manager object
+		JobManager jobManager = new JobManager(executionMode);
+		
+		// Set base dir for info server
+		Configuration infoserverConfig = GlobalConfiguration.getConfiguration();
+		if (configDir != null && new File(configDir).isDirectory()) {
+			infoserverConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir+"/..");
+		}
+		GlobalConfiguration.includeConfiguration(infoserverConfig);
+		return jobManager;
+	}
 }


Mime
View raw message