flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [13/63] [abbrv] Refactor job graph construction to incremental attachment based
Date Sun, 21 Sep 2014 02:12:37 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index bc95250..da1d28c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,1321 +18,1497 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.InitializeOnMaster;
-import org.apache.flink.api.common.io.OutputFormat;
+
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.execution.ExecutionListener;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
+import org.apache.flink.runtime.io.network.RemoteReceiver;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.jobgraph.AbstractJobInputVertex;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
-/**
- * In Nephele an execution graph is the main data structure for scheduling, executing and
- * observing a job. An execution graph is created from an job graph. In contrast to a job graph
- * it can contain communication edges of specific types, sub groups of vertices and information on
- * when and where (on which instance) to run particular tasks.
- * <p>
- * This class is thread-safe.
- * 
- */
-public class ExecutionGraph implements ExecutionListener {
 
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
+public class ExecutionGraph {
 
-	/**
-	 * The ID of the job this graph has been built for.
-	 */
+	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
+	
+	/** The log object used for debugging. */
+	static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
+
+	// --------------------------------------------------------------------------------------------
+	
+	/** The ID of the job this graph has been built for. */
 	private final JobID jobID;
 
-	/**
-	 * The name of the original job graph.
-	 */
+	/** The name of the original job graph. */
 	private final String jobName;
 
-	/**
-	 * Mapping of vertex IDs to vertices.
-	 */
-	private final ConcurrentMap<ExecutionVertexID, ExecutionVertex> vertexMap = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>(
-		1024);
-
-	/**
-	 * Mapping of channel IDs to edges.
-	 */
-	private final ConcurrentMap<ChannelID, ExecutionEdge> edgeMap = new ConcurrentHashMap<ChannelID, ExecutionEdge>(
-		1024 * 1024);
-
-	/**
-	 * List of stages in the graph.
-	 */
-	private final CopyOnWriteArrayList<ExecutionStage> stages = new CopyOnWriteArrayList<ExecutionStage>();
-
-	/**
-	 * The executor service to asynchronously perform update operations to this graph.
-	 */
-	private final ExecutorService executorService = Executors.newSingleThreadExecutor(ExecutorThreadFactory.INSTANCE);
-
-	/**
-	 * Index to the current execution stage.
-	 */
-	private volatile int indexToCurrentExecutionStage = 0;
-
-	/**
-	 * The job configuration that was originally attached to the JobGraph.
-	 */
+	/** The job configuration that was originally attached to the JobGraph. */
 	private final Configuration jobConfiguration;
+	
+	/** All job vertices that are part of this graph */
+	private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
+	
+	/** All intermediate results that are part of this graph */
+	private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
+	
+	/** An executor that can run long actions (involving remote calls) */
+	private final ExecutorService executor;
+	
+	
+	private final List<String> userCodeJarFiles;
+	
+	private final List<JobStatusListener> jobStatusListeners;
+	
+	private final List<ExecutionListener> executionListeners;
+	
+	
+	private DefaultScheduler jobScheduler;
+	
+	private boolean allowQueuedScheduling = false;
 
-	/**
-	 * The current status of the job which is represented by this execution graph.
-	 */
-	private final AtomicReference<InternalJobStatus> jobStatus = new AtomicReference<InternalJobStatus>(
-		InternalJobStatus.CREATED);
-
-	/**
-	 * The error description of the first task which causes this job to fail.
-	 */
-	private volatile String errorDescription = null;
-
-	/**
-	 * List of listeners which are notified in case the status of this job has changed.
-	 */
-	private final CopyOnWriteArrayList<JobStatusListener> jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
-
-	/**
-	 * List of listeners which are notified in case the execution stage of a job has changed.
-	 */
-	private final CopyOnWriteArrayList<ExecutionStageListener> executionStageListeners = new CopyOnWriteArrayList<ExecutionStageListener>();
-
-	/**
-	 * Private constructor used for duplicating execution vertices.
-	 * 
-	 * @param jobID
-	 *        the ID of the duplicated execution graph
-	 * @param jobName
-	 *        the name of the original job graph
-	 * @param jobConfiguration
-	 *        the configuration originally attached to the job graph
-	 */
-	private ExecutionGraph(final JobID jobID, final String jobName, final Configuration jobConfiguration) {
-		if (jobID == null) {
-			throw new IllegalArgumentException("Argument jobID must not be null");
-		}
-
-		this.jobID = jobID;
-		this.jobName = jobName;
-		this.jobConfiguration = jobConfiguration;
+	
+	private volatile JobStatus state = JobStatus.CREATED;
+	
+	
+	
+	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig) {
+		this(jobId, jobName, jobConfig, null);
 	}
-
-	/**
-	 * Creates a new execution graph from a job graph.
-	 * 
-	 * @param job
-	 *        the user's job graph
-	 * @param defaultParallelism
-	 *        defaultParallelism in case that nodes have no parallelism set
-	 * @throws GraphConversionException
-	 *         thrown if the job graph is not valid and no execution graph can be constructed from it
-	 */
-	public ExecutionGraph(JobGraph job, int defaultParallelism) throws GraphConversionException {
-		this(job.getJobID(), job.getName(), job.getJobConfiguration());
-
-		// Start constructing the new execution graph from given job graph
-		try {
-			constructExecutionGraph(job, defaultParallelism);
-		} catch (GraphConversionException e) {
-			throw e; // forward graph conversion exceptions
-		} catch (Exception e) {
-			throw new GraphConversionException(StringUtils.stringifyException(e));
+	
+	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, ExecutorService executor) {
+		if (jobId == null || jobName == null || jobConfig == null) {
+			throw new NullPointerException();
 		}
+		
+		this.jobID = jobId;
+		this.jobName = jobName;
+		this.jobConfiguration = jobConfig;
+		this.executor = executor;
+		
+		this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>();
+		this.intermediateResults = new ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>();
+		
+		this.userCodeJarFiles = new ArrayList<String>();
+		this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
+		this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
 	}
 
-	/**
-	 * Applies the user defined settings to the execution graph.
-	 * 
-	 * @param temporaryGroupVertexMap
-	 *        mapping between job vertices and the corresponding group vertices.
-	 * @throws GraphConversionException
-	 *         thrown if an error occurs while applying the user settings.
-	 */
-	private void applyUserDefinedSettings(final HashMap<AbstractJobVertex, ExecutionGroupVertex> temporaryGroupVertexMap)
-			throws GraphConversionException {
-
-		// The check for cycles in the dependency chain for instance sharing is already checked in
-		// <code>submitJob</code> method of the job manager
-
-		// If there is no cycle, apply the settings to the corresponding group vertices
-		final Iterator<Map.Entry<AbstractJobVertex, ExecutionGroupVertex>> it = temporaryGroupVertexMap.entrySet()
-			.iterator();
-		while (it.hasNext()) {
-
-			final Map.Entry<AbstractJobVertex, ExecutionGroupVertex> entry = it.next();
-			final AbstractJobVertex jobVertex = entry.getKey();
-			if (jobVertex.getVertexToShareInstancesWith() != null) {
-
-				final AbstractJobVertex vertexToShareInstancesWith = jobVertex.getVertexToShareInstancesWith();
-				final ExecutionGroupVertex groupVertex = entry.getValue();
-				final ExecutionGroupVertex groupVertexToShareInstancesWith = temporaryGroupVertexMap
-					.get(vertexToShareInstancesWith);
-				groupVertex.shareInstancesWith(groupVertexToShareInstancesWith);
-			}
+	// --------------------------------------------------------------------------------------------
+	
+	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
+					+ "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
 		}
-
-		// Second, we create the number of execution vertices each group vertex is supposed to manage
-		Iterator<ExecutionGroupVertex> it2 = new ExecutionGroupVertexIterator(this, true, -1);
-		while (it2.hasNext()) {
-
-			final ExecutionGroupVertex groupVertex = it2.next();
-			if (groupVertex.isNumberOfMembersUserDefined()) {
-				groupVertex.createInitialExecutionVertices(groupVertex.getUserDefinedNumberOfMembers());
+		
+		for (AbstractJobVertex jobVertex : topologiallySorted) {
+			
+			// create the execution job vertex and attach it to the graph
+			ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1);
+			ejv.connectToPredecessors(this.intermediateResults);
+			
+			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
+			if (previousTask != null) {
+				throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
+						jobVertex.getID(), ejv, previousTask));
 			}
-		}
-
-		// Finally, apply the channel settings channel settings
-		it2 = new ExecutionGroupVertexIterator(this, true, -1);
-		while (it2.hasNext()) {
-
-			final ExecutionGroupVertex groupVertex = it2.next();
-			for (int i = 0; i < groupVertex.getNumberOfForwardLinks(); i++) {
-
-				final ExecutionGroupEdge edge = groupVertex.getForwardEdge(i);
-				if (edge.isChannelTypeUserDefined()) {
-					edge.changeChannelType(edge.getChannelType());
+			
+			for (IntermediateResult res : ejv.getProducedDataSets()) {
+				IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
+				if (previousDataSet != null) {
+					throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
+							res.getId(), res, previousDataSet));
 				}
-
-				// Create edges between execution vertices
-				createExecutionEdgesForGroupEdge(edge);
 			}
 		}
-
-		// Repair the instance assignment after having changed the channel types
-		repairInstanceAssignment();
-
-		// Repair the instance sharing among different group vertices
-		repairInstanceSharing();
-
-		// Finally, repair the stages
-		repairStages();
 	}
-
-	/**
-	 * Sets up an execution graph from a job graph.
-	 * 
-	 * @param jobGraph
-	 *        the job graph to create the execution graph from
-	 * @param defaultParallelism
-	 *        defaultParallelism in case that nodes have no parallelism set
-	 * @throws GraphConversionException
-	 *         thrown if the job graph is not valid and no execution graph can be constructed from it
-	 */
-	private void constructExecutionGraph(final JobGraph jobGraph, final int defaultParallelism)
-			throws GraphConversionException {
-
-		// Clean up temporary data structures
-		final HashMap<AbstractJobVertex, ExecutionVertex> temporaryVertexMap = new HashMap<AbstractJobVertex, ExecutionVertex>();
-		final HashMap<AbstractJobVertex, ExecutionGroupVertex> temporaryGroupVertexMap = new HashMap<AbstractJobVertex, ExecutionGroupVertex>();
-
-		// Initially, create only one execution stage that contains all group vertices
-		final ExecutionStage initialExecutionStage = new ExecutionStage(this, 0);
-		this.stages.add(initialExecutionStage);
-
-		// Convert job vertices to execution vertices and initialize them
-		final AbstractJobVertex[] all = jobGraph.getAllJobVertices();
-		for (int i = 0; i < all.length; i++) {
-			if(all[i].getNumberOfSubtasks() == -1){
-				all[i].setNumberOfSubtasks(defaultParallelism);
-			}
-
-			final ExecutionVertex createdVertex = createVertex(all[i], initialExecutionStage);
-			temporaryVertexMap.put(all[i], createdVertex);
-			temporaryGroupVertexMap.put(all[i], createdVertex.getGroupVertex());
-		}
-
-		// Create initial edges between the vertices
-		createInitialGroupEdges(temporaryVertexMap);
-
-		// Now that an initial graph is built, apply the user settings
-		applyUserDefinedSettings(temporaryGroupVertexMap);
-
-		// Calculate the connection IDs
-		calculateConnectionIDs();
-
-		// Finally, construct the execution pipelines
-		reconstructExecutionPipelines();
+	
+	public void addUserCodeJarFile(String jarFile) {
+		this.userCodeJarFiles.add(jarFile);
 	}
-
-	private void createExecutionEdgesForGroupEdge(final ExecutionGroupEdge groupEdge) {
-
-		final ExecutionGroupVertex source = groupEdge.getSourceVertex();
-		final int indexOfOutputGate = groupEdge.getIndexOfOutputGate();
-		final ExecutionGroupVertex target = groupEdge.getTargetVertex();
-		final int indexOfInputGate = groupEdge.getIndexOfInputGate();
-
-		final Map<GateID, List<ExecutionEdge>> inputChannelMap = new HashMap<GateID, List<ExecutionEdge>>();
-
-		// Unwire the respective gate of the source vertices
-		final int currentNumberOfSourceNodes = source.getCurrentNumberOfGroupMembers();
-		for (int i = 0; i < currentNumberOfSourceNodes; ++i) {
-
-			final ExecutionVertex sourceVertex = source.getGroupMember(i);
-			final ExecutionGate outputGate = sourceVertex.getOutputGate(indexOfOutputGate);
-			if (outputGate == null) {
-				throw new IllegalStateException("wire: " + sourceVertex.getName()
-					+ " has no output gate with index " + indexOfOutputGate);
-			}
-
-			if (outputGate.getNumberOfEdges() > 0) {
-				throw new IllegalStateException("wire: wire called on source " + sourceVertex.getName() + " (" + i
-					+ "), but number of output channels is " + outputGate.getNumberOfEdges() + "!");
+	
+	public String[] getUserCodeJarFiles() {
+		return (String[]) this.userCodeJarFiles.toArray(new String[this.userCodeJarFiles.size()]);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public JobID getJobID() {
+		return jobID;
+	}
+	
+	public String getJobName() {
+		return jobName;
+	}
+	
+	public Configuration getJobConfiguration() {
+		return jobConfiguration;
+	}
+	
+	public JobStatus getState() {
+		return state;
+	}
+	
+	public ExecutionJobVertex getJobVertex(JobVertexID id) {
+		return this.tasks.get(id);
+	}
+	
+	public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
+		return Collections.unmodifiableMap(this.tasks);
+	}
+	
+	public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
+		return Collections.unmodifiableMap(this.intermediateResults);
+	}
+	
+	public Iterable<ExecutionVertex2> getAllExecutionVertices() {
+		return new Iterable<ExecutionVertex2>() {
+			@Override
+			public Iterator<ExecutionVertex2> iterator() {
+				return new AllVerticesIterator(tasks.values().iterator());
 			}
-
-			final int currentNumberOfTargetNodes = target.getCurrentNumberOfGroupMembers();
-			final List<ExecutionEdge> outputChannels = new ArrayList<ExecutionEdge>();
-
-			for (int j = 0; j < currentNumberOfTargetNodes; ++j) {
-
-				final ExecutionVertex targetVertex = target.getGroupMember(j);
-				final ExecutionGate inputGate = targetVertex.getInputGate(indexOfInputGate);
-				if (inputGate == null) {
-					throw new IllegalStateException("wire: " + targetVertex.getName()
-						+ " has no input gate with index " + indexOfInputGate);
-				}
-
-				if (inputGate.getNumberOfEdges() > 0 && i == 0) {
-					throw new IllegalStateException("wire: wire called on target " + targetVertex.getName() + " ("
-						+ j + "), but number of input channels is " + inputGate.getNumberOfEdges() + "!");
-				}
-
-				// Check if a wire is supposed to be created
-				if (DistributionPatternProvider.createWire(groupEdge.getDistributionPattern(),
-					i, j, currentNumberOfSourceNodes, currentNumberOfTargetNodes)) {
-
-					final ChannelID outputChannelID = new ChannelID();
-					final ChannelID inputChannelID = new ChannelID();
-
-					final ExecutionEdge edge = new ExecutionEdge(outputGate, inputGate, groupEdge, outputChannelID,
-						inputChannelID, outputGate.getNumberOfEdges(), inputGate.getNumberOfEdges());
-
-					this.edgeMap.put(outputChannelID, edge);
-					this.edgeMap.put(inputChannelID, edge);
-
-					outputChannels.add(edge);
-
-					List<ExecutionEdge> inputChannels = inputChannelMap.get(inputGate.getGateID());
-					if (inputChannels == null) {
-						inputChannels = new ArrayList<ExecutionEdge>();
-						inputChannelMap.put(inputGate.getGateID(), inputChannels);
+		};
+	}
+	
+	public boolean isQueuedSchedulingAllowed() {
+		return this.allowQueuedScheduling;
+	}
+	
+	public void setQueuedSchedulingAllowed(boolean allowed) {
+		this.allowQueuedScheduling = allowed;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public void registerJobStatusListener(JobStatusListener jobStatusListener) {
+		this.jobStatusListeners.add(jobStatusListener);
+	}
+	
+	public void registerExecutionListener(ExecutionListener executionListener) {
+		this.executionListeners.add(executionListener);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public void scheduleForExecution(DefaultScheduler scheduler) throws JobException {
+		if (scheduler == null) {
+			throw new IllegalArgumentException("Scheduler must not be null.");
+		}
+		
+		if (STATE_UPDATER.compareAndSet(this, JobStatus.CREATED, JobStatus.RUNNING)) {
+			this.jobScheduler = scheduler;
+			
+			notifyJobStatusChange(JobStatus.RUNNING, null);
+			
+			// initially, we simply take the ones without inputs.
+			// next, we implement the logic to go back from vertices that need computation
+			// to the ones we need to start running
+			for (ExecutionJobVertex ejv : this.tasks.values()) {
+				if (ejv.getJobVertex().isInputVertex()) {
+					for (ExecutionVertex2 ev : ejv.getTaskVertices()) {
+						ev.scheduleForExecution(scheduler);
 					}
-
-					inputChannels.add(edge);
 				}
 			}
-
-			outputGate.replaceAllEdges(outputChannels);
 		}
-
-		// Finally, set the channels for the input gates
-		final int currentNumberOfTargetNodes = target.getCurrentNumberOfGroupMembers();
-		for (int i = 0; i < currentNumberOfTargetNodes; ++i) {
-
-			final ExecutionVertex targetVertex = target.getGroupMember(i);
-			final ExecutionGate inputGate = targetVertex.getInputGate(indexOfInputGate);
-
-			final List<ExecutionEdge> inputChannels = inputChannelMap.get(inputGate.getGateID());
-			if (inputChannels == null) {
-				LOG.error("Cannot find input channels for gate ID " + inputGate.getGateID());
-				continue;
-			}
-
-			inputGate.replaceAllEdges(inputChannels);
+		else {
+			throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
 		}
-
 	}
-
-	/**
-	 * Creates the initial edges between the group vertices
-	 * 
-	 * @param vertexMap
-	 *        the temporary vertex map
-	 * @throws GraphConversionException
-	 *         if the initial wiring cannot be created
-	 */
-	private void createInitialGroupEdges(final HashMap<AbstractJobVertex, ExecutionVertex> vertexMap)
-			throws GraphConversionException {
-
-		Iterator<Map.Entry<AbstractJobVertex, ExecutionVertex>> it = vertexMap.entrySet().iterator();
-
-		while (it.hasNext()) {
-
-			final Map.Entry<AbstractJobVertex, ExecutionVertex> entry = it.next();
-			final AbstractJobVertex sjv = entry.getKey();
-			final ExecutionVertex sev = entry.getValue();
-			final ExecutionGroupVertex sgv = sev.getGroupVertex();
-
-			// First, build the group edges
-			for (int i = 0; i < sjv.getNumberOfForwardConnections(); ++i) {
-				final JobEdge edge = sjv.getForwardConnection(i);
-				final AbstractJobVertex tjv = edge.getConnectedVertex();
-
-				final ExecutionVertex tev = vertexMap.get(tjv);
-				final ExecutionGroupVertex tgv = tev.getGroupVertex();
-				// Use NETWORK as default channel type if nothing else is defined by the user
-				ChannelType channelType = edge.getChannelType();
-				boolean userDefinedChannelType = true;
-				if (channelType == null) {
-					userDefinedChannelType = false;
-					channelType = ChannelType.NETWORK;
-				}
-
-				final DistributionPattern distributionPattern = edge.getDistributionPattern();
-
-				// Connect the corresponding group vertices and copy the user settings from the job edge
-				final ExecutionGroupEdge groupEdge = sgv.wireTo(tgv, edge.getIndexOfInputGate(), i, channelType,
-					userDefinedChannelType,distributionPattern);
-
-				final ExecutionGate outputGate = new ExecutionGate(new GateID(), sev, groupEdge, false);
-				sev.insertOutputGate(i, outputGate);
-				final ExecutionGate inputGate = new ExecutionGate(new GateID(), tev, groupEdge, true);
-				tev.insertInputGate(edge.getIndexOfInputGate(), inputGate);
-			}
-		}
+	
+	public void cancel() {
+		//TODO
 	}
-
+	
+	public void updateState(TaskExecutionState state) {
+		//TODO		
+	}
+	
+	public ConnectionInfoLookupResponse lookupConnectionInfoAndDeployReceivers(InstanceConnectionInfo caller, ChannelID sourceChannelID) {
+		//TODO
+		return null;
+		
+//		final InternalJobStatus jobStatus = eg.getJobStatus();
+//		if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.CANCELING) {
+//			return ConnectionInfoLookupResponse.createJobIsAborting();
+//		}
+//
+//		final ExecutionEdge edge = eg.getEdgeByID(sourceChannelID);
+//		if (edge == null) {
+//			LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
+//			return ConnectionInfoLookupResponse.createReceiverNotFound();
+//		}
+//
+//		if (sourceChannelID.equals(edge.getInputChannelID())) {
+//			// Request was sent from an input channel
+//
+//			final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
+//
+//			final Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
+//			if (assignedInstance == null) {
+//				LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex()
+//					+ " but no instance assigned");
+//				// LOG.info("Created receiverNotReady for " + connectedVertex + " 1");
+//				return ConnectionInfoLookupResponse.createReceiverNotReady();
+//			}
+//
+//			// Check execution state
+//			final ExecutionState executionState = connectedVertex.getExecutionState();
+//			if (executionState == ExecutionState.FINISHED) {
+//				// that should not happen. if there is data pending, the receiver cannot be ready
+//				return ConnectionInfoLookupResponse.createReceiverNotFound();
+//			}
+//
+//			// running is common, finishing is happens when the lookup is for the close event
+//			if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
+//				// LOG.info("Created receiverNotReady for " + connectedVertex + " in state " + executionState + " 2");
+//				return ConnectionInfoLookupResponse.createReceiverNotReady();
+//			}
+//
+//			if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
+//				// Receiver runs on the same task manager
+//				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
+//			} else {
+//				// Receiver runs on a different task manager
+//
+//				final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
+//				final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
+//
+//				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
+//			}
+//		}
+//		// else, the request is for an output channel
+//		// Find vertex of connected input channel
+//		final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
+//
+//		// Check execution state
+//		final ExecutionState executionState = targetVertex.getExecutionState();
+//
+//		// check whether the task needs to be deployed
+//		if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {
+//
+//			if (executionState == ExecutionState.ASSIGNED) {
+//				final Runnable command = new Runnable() {
+//					@Override
+//					public void run() {
+//						scheduler.deployAssignedVertices(targetVertex);
+//					}
+//				};
+//				eg.executeCommand(command);
+//			}
+//
+//			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
+//			return ConnectionInfoLookupResponse.createReceiverNotReady();
+//		}
+//
+//		final Instance assignedInstance = targetVertex.getAllocatedResource().getInstance();
+//		if (assignedInstance == null) {
+//			LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
+//			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
+//			return ConnectionInfoLookupResponse.createReceiverNotReady();
+//		}
+//
+//		if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
+//			// Receiver runs on the same task manager
+//			return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
+//		} else {
+//			// Receiver runs on a different task manager
+//			final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
+//			final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
+//
+//			return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
+//		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	
 	/**
-	 * Creates an execution vertex from a job vertex.
+	 * NOTE: This method never throws an error, only logs errors caused by the notified listeners.
 	 * 
-	 * @param jobVertex
-	 *        the job vertex to create the execution vertex from
-	 * @param initialExecutionStage
-	 *        the initial execution stage all group vertices are added to
-	 * @return the new execution vertex
-	 * @throws GraphConversionException
-	 *         thrown if the job vertex is of an unknown subclass
+	 * @param newState
+	 * @param message
 	 */
-	private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final ExecutionStage initialExecutionStage)
-			throws GraphConversionException {
-
-		// Create an initial execution vertex for the job vertex
-		final Class<? extends AbstractInvokable> invokableClass = jobVertex.getInvokableClass();
-		if (invokableClass == null) {
-			throw new GraphConversionException("JobVertex " + jobVertex.getID() + " (" + jobVertex.getName()
-				+ ") does not specify a task");
-		}
-
-		// Calculate the cryptographic signature of this vertex
-		final ExecutionSignature signature = ExecutionSignature.createSignature(jobVertex.getInvokableClass(),
-			jobVertex.getJobGraph().getJobID());
-
-		// Create a group vertex for the job vertex
-
-		ExecutionGroupVertex groupVertex = null;
-		try {
-			groupVertex = new ExecutionGroupVertex(jobVertex.getName(), jobVertex.getID(), this,
-				jobVertex.getNumberOfSubtasks(), jobVertex.getVertexToShareInstancesWith() != null ? true
-					: false, jobVertex.getNumberOfExecutionRetries(), jobVertex.getConfiguration(), signature,
-				invokableClass);
-		} catch (Throwable t) {
-			throw new GraphConversionException(t);
-		}
-
-		// Register input and output vertices separately
-		if (jobVertex instanceof AbstractJobInputVertex) {
-
-			final AbstractJobInputVertex jobInputVertex = (AbstractJobInputVertex) jobVertex;
-			
-			if (jobVertex instanceof JobInputVertex) {
-				try {
-					// get a handle to the user code class loader
-					ClassLoader cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
-					
-					((JobInputVertex) jobVertex).initializeInputFormatFromTaskConfig(cl);
-				}
-				catch (Throwable t) {
-					throw new GraphConversionException("Could not deserialize input format.", t);
-				}
-			}
-			
-			final Class<? extends InputSplit> inputSplitType = jobInputVertex.getInputSplitType();
-			
-			InputSplit[] inputSplits;
-
+	private void notifyJobStatusChange(JobStatus newState, String message) {
+		for (JobStatusListener listener : this.jobStatusListeners) {
 			try {
-				inputSplits = jobInputVertex.getInputSplits(jobVertex.getNumberOfSubtasks());
+				listener.jobStatusHasChanged(this, newState, message);
 			}
 			catch (Throwable t) {
-				throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName(), t);
-			}
-
-			if (inputSplits == null) {
-				inputSplits = new InputSplit[0];
-			}
-			
-			LOG.info("Job input vertex " + jobVertex.getName() + " generated " + inputSplits.length + " input splits");
-
-			// assign input splits and type
-			groupVertex.setInputSplits(inputSplits);
-			groupVertex.setInputSplitType(inputSplitType);
-		}
-
-		if (jobVertex instanceof JobOutputVertex){
-			final JobOutputVertex jobOutputVertex = (JobOutputVertex) jobVertex;
-			
-			try {
-				// get a handle to the user code class loader
-				ClassLoader cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
-				jobOutputVertex.initializeOutputFormatFromTaskConfig(cl);
-			}
-			catch (Throwable t) {
-				throw new GraphConversionException("Could not deserialize output format.", t);
-			}
-
-			OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
-			if (outputFormat != null && outputFormat instanceof InitializeOnMaster){
-				try {
-					((InitializeOnMaster) outputFormat).initializeGlobal(jobVertex.getNumberOfSubtasks());
-				}
-				catch (Throwable t) {
-					throw new GraphConversionException(t);
-				}
+				LOG.error("Notification of job status change caused an error.", t);
 			}
 		}
-
-		// Add group vertex to initial execution stage
-		initialExecutionStage.addStageMember(groupVertex);
-
-		final ExecutionVertex ev = new ExecutionVertex(this, groupVertex, jobVertex.getNumberOfForwardConnections(),
-			jobVertex.getNumberOfBackwardConnections());
-
-		// Assign initial instance to vertex (may be overwritten later on when user settings are applied)
-		ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(), null));
-
-		return ev;
-	}
-
-	/**
-	 * Returns the number of input vertices registered with this execution graph.
-	 * 
-	 * @return the number of input vertices registered with this execution graph
-	 */
-	public int getNumberOfInputVertices() {
-		return this.stages.get(0).getNumberOfInputExecutionVertices();
 	}
-
+	
 	/**
-	 * Returns the number of input vertices for the given stage.
+	 * NOTE: This method never throws an error, only logs errors caused by the notified listeners.
 	 * 
-	 * @param stage
-	 *        the index of the execution stage
-	 * @return the number of input vertices for the given stage
+	 * @param vertexId
+	 * @param subtask
+	 * @param newExecutionState
+	 * @param optionalMessage
 	 */
-	public int getNumberOfInputVertices(int stage) {
-		if (stage >= this.stages.size()) {
-			return 0;
+	void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionState2 newExecutionState, String optionalMessage) {
+		for (ExecutionListener listener : this.executionListeners) {
+			try {
+				listener.executionStateChanged(jobID, vertexId, subtask, newExecutionState, optionalMessage);
+			}
+			catch (Throwable t) {
+				LOG.error("Notification of execution state change caused an error.", t);
+			}
 		}
-
-		return this.stages.get(stage).getNumberOfInputExecutionVertices();
 	}
-
-	/**
-	 * Returns the number of output vertices registered with this execution graph.
-	 * 
-	 * @return the number of output vertices registered with this execution graph
-	 */
-	public int getNumberOfOutputVertices() {
-		return this.stages.get(0).getNumberOfOutputExecutionVertices();
-	}
-
-	/**
-	 * Returns the number of output vertices for the given stage.
-	 * 
-	 * @param stage
-	 *        the index of the execution stage
-	 * @return the number of input vertices for the given stage
-	 */
-	public int getNumberOfOutputVertices(final int stage) {
-		if (stage >= this.stages.size()) {
-			return 0;
+	
+	// --------------------------------------------------------------------------------------------
+	//  Miscellaneous
+	// --------------------------------------------------------------------------------------------
+	
+	
+	public void execute(Runnable action) {
+		if (this.executor == null) {
+			throw new IllegalStateException("Executor has not been set.");
 		}
-
-		return this.stages.get(stage).getNumberOfOutputExecutionVertices();
-	}
-
-	/**
-	 * Returns the input vertex with the specified index.
-	 * 
-	 * @param index
-	 *        the index of the input vertex to return
-	 * @return the input vertex with the specified index or <code>null</code> if no input vertex with such an index
-	 *         exists
-	 */
-	public ExecutionVertex getInputVertex(final int index) {
-		return this.stages.get(0).getInputExecutionVertex(index);
-	}
-
-	/**
-	 * Returns the output vertex with the specified index.
-	 * 
-	 * @param index
-	 *        the index of the output vertex to return
-	 * @return the output vertex with the specified index or <code>null</code> if no output vertex with such an index
-	 *         exists
-	 */
-	public ExecutionVertex getOutputVertex(final int index) {
-		return this.stages.get(0).getOutputExecutionVertex(index);
-	}
-
-	/**
-	 * Returns the input vertex with the specified index for the given stage
-	 * 
-	 * @param stage
-	 *        the index of the stage
-	 * @param index
-	 *        the index of the input vertex to return
-	 * @return the input vertex with the specified index or <code>null</code> if no input vertex with such an index
-	 *         exists in that stage
-	 */
-	public ExecutionVertex getInputVertex(final int stage, final int index) {
-		try {
-			final ExecutionStage s = this.stages.get(stage);
-			if (s == null) {
-				return null;
-			}
-
-			return s.getInputExecutionVertex(index);
-
-		} catch (ArrayIndexOutOfBoundsException e) {
-			return null;
-		}
-	}
-
-	/**
-	 * Returns the output vertex with the specified index for the given stage.
-	 * 
-	 * @param stage
-	 *        the index of the stage
-	 * @param index
-	 *        the index of the output vertex to return
-	 * @return the output vertex with the specified index or <code>null</code> if no output vertex with such an index
-	 *         exists in that stage
-	 */
-	public ExecutionVertex getOutputVertex(final int stage, final int index) {
-		try {
-			final ExecutionStage s = this.stages.get(stage);
-			if (s == null) {
-				return null;
-			}
-
-			return s.getOutputExecutionVertex(index);
-
-		} catch (ArrayIndexOutOfBoundsException e) {
-			return null;
-		}
-	}
-
-	/**
-	 * Returns the execution stage with number <code>num</code>.
-	 * 
-	 * @param num
-	 *        the number of the execution stage to be returned
-	 * @return the execution stage with number <code>num</code> or <code>null</code> if no such execution stage exists
-	 */
-	public ExecutionStage getStage(final int num) {
-		try {
-			return this.stages.get(num);
-		} catch (ArrayIndexOutOfBoundsException e) {
-			return null;
-		}
-	}
-
-	/**
-	 * Returns the number of execution stages in the execution graph.
-	 * 
-	 * @return the number of execution stages in the execution graph
-	 */
-	public int getNumberOfStages() {
-		return this.stages.size();
-	}
-
-	/**
-	 * Identifies an execution by the specified channel ID and returns it.
-	 * 
-	 * @param id
-	 *        the channel ID to identify the vertex with
-	 * @return the execution vertex which has a channel with ID <code>id</code> or <code>null</code> if no such vertex
-	 *         exists in the execution graph
-	 */
-	public ExecutionVertex getVertexByChannelID(final ChannelID id) {
-		final ExecutionEdge edge = this.edgeMap.get(id);
-		if (edge == null) {
-			return null;
-		}
-
-		if (id.equals(edge.getOutputChannelID())) {
-			return edge.getOutputGate().getVertex();
-		}
-
-		return edge.getInputGate().getVertex();
-	}
-
-	/**
-	 * Finds an {@link ExecutionEdge} by its ID and returns it.
-	 * 
-	 * @param id
-	 *        the channel ID to identify the edge
-	 * @return the edge whose ID matches <code>id</code> or <code>null</code> if no such edge is known
-	 */
-	public ExecutionEdge getEdgeByID(final ChannelID id) {
-		return this.edgeMap.get(id);
-	}
-
-	/**
-	 * Registers an execution vertex with the execution graph.
-	 * 
-	 * @param vertex
-	 *        the execution vertex to register
-	 */
-	void registerExecutionVertex(final ExecutionVertex vertex) {
-		if (this.vertexMap.put(vertex.getID(), vertex) != null) {
-			throw new IllegalStateException("There is already an execution vertex with ID " + vertex.getID()
-				+ " registered");
-		}
-	}
-
-	/**
-	 * Returns the execution vertex with the given vertex ID.
-	 * 
-	 * @param id
-	 *        the vertex ID to retrieve the execution vertex
-	 * @return the execution vertex matching the provided vertex ID or <code>null</code> if no such vertex could be
-	 *         found
-	 */
-	public ExecutionVertex getVertexByID(final ExecutionVertexID id) {
-		return this.vertexMap.get(id);
-	}
-
-	/**
-	 * Checks if the current execution stage has been successfully completed, i.e.
-	 * all vertices in this stage have successfully finished their execution.
-	 * 
-	 * @return <code>true</code> if stage is completed, <code>false</code> otherwise
-	 */
-	private boolean isCurrentStageCompleted() {
-		if (this.indexToCurrentExecutionStage >= this.stages.size()) {
-			return true;
-		}
-
-		final ExecutionGraphIterator it = new ExecutionGraphIterator(this, this.indexToCurrentExecutionStage, true,
-			true);
-		while (it.hasNext()) {
-			final ExecutionVertex vertex = it.next();
-			if (vertex.getExecutionState() != ExecutionState.FINISHED) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	/**
-	 * Checks if the execution of execution graph is finished.
-	 * 
-	 * @return <code>true</code> if the execution of the graph is finished, <code>false</code> otherwise
-	 */
-	public boolean isExecutionFinished() {
-		return (getJobStatus() == InternalJobStatus.FINISHED);
-	}
-
-	/**
-	 * Returns the job ID of the job configuration this execution graph was originally constructed from.
-	 * 
-	 * @return the job ID of the job configuration this execution graph was originally constructed from
-	 */
-	public JobID getJobID() {
-		return this.jobID;
-	}
-
-	/**
-	 * Returns the index of the current execution stage.
-	 * 
-	 * @return the index of the current execution stage
-	 */
-	public int getIndexOfCurrentExecutionStage() {
-		return this.indexToCurrentExecutionStage;
-	}
-
-	/**
-	 * Returns the stage which is currently executed.
-	 * 
-	 * @return the currently executed stage or <code>null</code> if the job execution is already completed
-	 */
-	public ExecutionStage getCurrentExecutionStage() {
-
-		try {
-			return this.stages.get(this.indexToCurrentExecutionStage);
-		} catch (ArrayIndexOutOfBoundsException e) {
-			return null;
-		}
-	}
-
-	public void repairStages() {
-
-		final Map<ExecutionGroupVertex, Integer> stageNumbers = new HashMap<ExecutionGroupVertex, Integer>();
-		ExecutionGroupVertexIterator it = new ExecutionGroupVertexIterator(this, true, -1);
-
-		while (it.hasNext()) {
-
-			final ExecutionGroupVertex groupVertex = it.next();
-			int precedingNumber = 0;
-			if (stageNumbers.containsKey(groupVertex)) {
-				precedingNumber = stageNumbers.get(groupVertex).intValue();
-			} else {
-				stageNumbers.put(groupVertex, Integer.valueOf(precedingNumber));
-			}
-
-			for (int i = 0; i < groupVertex.getNumberOfForwardLinks(); i++) {
-
-				final ExecutionGroupEdge edge = groupVertex.getForwardEdge(i);
-				if (!stageNumbers.containsKey(edge.getTargetVertex())) {
-					// Target vertex has not yet been discovered
-					// Same stage as preceding vertex
-					stageNumbers.put(edge.getTargetVertex(), Integer.valueOf(precedingNumber));
-				} else {
-					final int stageNumber = stageNumbers.get(edge.getTargetVertex()).intValue();
-					if (stageNumber != precedingNumber) {
-						stageNumbers.put(edge.getTargetVertex(), (int) Math.max(precedingNumber, stageNumber));
-					}
-				}
-			}
-		}
-
-		// Traverse the graph backwards (starting from the output vertices) to make sure vertices are allocated in a
-		// stage as high as possible
-		it = new ExecutionGroupVertexIterator(this, false, -1);
-
-		while (it.hasNext()) {
-
-			final ExecutionGroupVertex groupVertex = it.next();
-			final int succeedingNumber = stageNumbers.get(groupVertex);
-
-			for (int i = 0; i < groupVertex.getNumberOfBackwardLinks(); i++) {
-
-				final ExecutionGroupEdge edge = groupVertex.getBackwardEdge(i);
-				final int stageNumber = stageNumbers.get(edge.getSourceVertex());
-				if (stageNumber != succeedingNumber) {
-					throw new IllegalStateException(edge.getSourceVertex() + " and " + edge.getTargetVertex()
-						+ " are assigned to different stages");
-				}
-			}
-		}
-
-		// Finally, assign the new stage numbers
-		this.stages.clear();
-		final Iterator<Map.Entry<ExecutionGroupVertex, Integer>> it2 = stageNumbers.entrySet().iterator();
-		while (it2.hasNext()) {
-
-			final Map.Entry<ExecutionGroupVertex, Integer> entry = it2.next();
-			final ExecutionGroupVertex groupVertex = entry.getKey();
-			final int stageNumber = entry.getValue().intValue();
-			// Prevent out of bounds exceptions
-			while (this.stages.size() <= stageNumber) {
-				this.stages.add(null);
-			}
-			ExecutionStage executionStage = this.stages.get(stageNumber);
-			// If the stage not yet exists,
-			if (executionStage == null) {
-				executionStage = new ExecutionStage(this, stageNumber);
-				this.stages.set(stageNumber, executionStage);
-			}
-
-			executionStage.addStageMember(groupVertex);
-			groupVertex.setExecutionStage(executionStage);
-		}
-	}
-
-	public void repairInstanceSharing() {
-
-		final Set<AllocatedResource> availableResources = new LinkedHashSet<AllocatedResource>();
-
-		final Iterator<ExecutionGroupVertex> it = new ExecutionGroupVertexIterator(this, true, -1);
-		while (it.hasNext()) {
-			final ExecutionGroupVertex groupVertex = it.next();
-			if (groupVertex.getVertexToShareInstancesWith() == null) {
-				availableResources.clear();
-				groupVertex.repairInstanceSharing(availableResources);
-			}
-		}
-	}
-
-	public void repairInstanceAssignment() {
-
-		Iterator<ExecutionVertex> it = new ExecutionGraphIterator(this, true);
-		while (it.hasNext()) {
-
-			final ExecutionVertex sourceVertex = it.next();
-
-			for (int i = 0; i < sourceVertex.getNumberOfOutputGates(); ++i) {
-
-				final ExecutionGate outputGate = sourceVertex.getOutputGate(i);
-				final ChannelType channelType = outputGate.getChannelType();
-				if (channelType == ChannelType.IN_MEMORY) {
-					final int numberOfOutputChannels = outputGate.getNumberOfEdges();
-					for (int j = 0; j < numberOfOutputChannels; ++j) {
-						final ExecutionEdge outputChannel = outputGate.getEdge(j);
-						outputChannel.getInputGate().getVertex()
-							.setAllocatedResource(sourceVertex.getAllocatedResource());
-					}
-				}
-			}
-		}
-
-		it = new ExecutionGraphIterator(this, false);
-		while (it.hasNext()) {
-
-			final ExecutionVertex targetVertex = it.next();
-
-			for (int i = 0; i < targetVertex.getNumberOfInputGates(); ++i) {
-
-				final ExecutionGate inputGate = targetVertex.getInputGate(i);
-				final ChannelType channelType = inputGate.getChannelType();
-				if (channelType == ChannelType.IN_MEMORY) {
-					final int numberOfInputChannels = inputGate.getNumberOfEdges();
-					for (int j = 0; j < numberOfInputChannels; ++j) {
-						final ExecutionEdge inputChannel = inputGate.getEdge(j);
-						inputChannel.getOutputGate().getVertex()
-							.setAllocatedResource(targetVertex.getAllocatedResource());
-					}
-				}
-			}
-		}
-	}
-
-	public ChannelType getChannelType(final ExecutionVertex sourceVertex, final ExecutionVertex targetVertex) {
-
-		final ExecutionGroupVertex sourceGroupVertex = sourceVertex.getGroupVertex();
-		final ExecutionGroupVertex targetGroupVertex = targetVertex.getGroupVertex();
-
-		final List<ExecutionGroupEdge> edges = sourceGroupVertex.getForwardEdges(targetGroupVertex);
-		if (edges.size() == 0) {
-			return null;
-		}
-
-		// On a task level, the two vertices are connected
-		final ExecutionGroupEdge edge = edges.get(0);
-
-		// Now lets see if these two concrete subtasks are connected
-		final ExecutionGate outputGate = sourceVertex.getOutputGate(edge.getIndexOfOutputGate());
-		for (int i = 0; i < outputGate.getNumberOfEdges(); ++i) {
-
-			final ExecutionEdge outputChannel = outputGate.getEdge(i);
-			if (targetVertex == outputChannel.getInputGate().getVertex()) {
-				return edge.getChannelType();
-			}
-		}
-
-		return null;
-	}
-
-	/**
-	 * Returns the job configuration that was originally attached to the job graph.
-	 * 
-	 * @return the job configuration that was originally attached to the job graph
-	 */
-	public Configuration getJobConfiguration() {
-		return this.jobConfiguration;
-	}
-
-	/**
-	 * Checks whether the job represented by the execution graph has the status <code>FINISHED</code>.
-	 * 
-	 * @return <code>true</code> if the job has the status <code>CREATED</code>, <code>false</code> otherwise
-	 */
-	private boolean jobHasFinishedStatus() {
-
-		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(this, true);
-
-		while (it.hasNext()) {
-
-			if (it.next().getExecutionState() != ExecutionState.FINISHED) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	/**
-	 * Checks whether the job represented by the execution graph has the status <code>SCHEDULED</code>.
-	 * 
-	 * @return <code>true</code> if the job has the status <code>SCHEDULED</code>, <code>false</code> otherwise
-	 */
-	private boolean jobHasScheduledStatus() {
-
-		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(this, true);
-
-		while (it.hasNext()) {
-
-			final ExecutionState s = it.next().getExecutionState();
-			if (s != ExecutionState.CREATED && s != ExecutionState.SCHEDULED && s != ExecutionState.READY) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	/**
-	 * Checks whether the job represented by the execution graph has the status <code>CANCELED</code> or
-	 * <code>FAILED</code>.
-	 * 
-	 * @return <code>true</code> if the job has the status <code>CANCELED</code> or <code>FAILED</code>,
-	 *         <code>false</code> otherwise
-	 */
-	private boolean jobHasFailedOrCanceledStatus() {
-
-		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(this, true);
-
-		while (it.hasNext()) {
-
-			final ExecutionState state = it.next().getExecutionState();
-
-			if (state != ExecutionState.CANCELED && state != ExecutionState.FAILED && state != ExecutionState.FINISHED) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	private static InternalJobStatus determineNewJobStatus(final ExecutionGraph eg,
-			final ExecutionState latestStateChange) {
-
-		final InternalJobStatus currentJobStatus = eg.getJobStatus();
-
-		switch (currentJobStatus) {
-		case CREATED:
-			if (eg.jobHasScheduledStatus()) {
-				return InternalJobStatus.SCHEDULED;
-			} else if (latestStateChange == ExecutionState.CANCELED) {
-				if (eg.jobHasFailedOrCanceledStatus()) {
-					return InternalJobStatus.CANCELED;
-				}
-			}else if(latestStateChange == ExecutionState.FAILED){
-				return InternalJobStatus.FAILING;
-			}
-			break;
-		case SCHEDULED:
-			if (latestStateChange == ExecutionState.RUNNING) {
-				return InternalJobStatus.RUNNING;
-			} else if (latestStateChange == ExecutionState.CANCELED) {
-				if (eg.jobHasFailedOrCanceledStatus()) {
-					return InternalJobStatus.CANCELED;
-				}
-			}else if(latestStateChange == ExecutionState.FAILED){
-				return InternalJobStatus.FAILING;
-			}
-			break;
-		case RUNNING:
-			if (latestStateChange == ExecutionState.CANCELED) {
-				return InternalJobStatus.CANCELING;
-			}
-			if (latestStateChange == ExecutionState.FAILED) {
-
-				final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, true);
-				while (it.hasNext()) {
-
-					final ExecutionVertex vertex = it.next();
-					if (vertex.getExecutionState() == ExecutionState.FAILED) {
-						return InternalJobStatus.FAILING;
-					}
-				}
-			}
-			if (eg.jobHasFinishedStatus()) {
-				return InternalJobStatus.FINISHED;
-			}
-			break;
-		case FAILING:
-			if (eg.jobHasFailedOrCanceledStatus()) {
-				return InternalJobStatus.FAILED;
-			}
-			break;
-		case FAILED:
-			LOG.error("Received update of execute state in job status FAILED");
-			break;
-		case CANCELING:
-			if (eg.jobHasFailedOrCanceledStatus()) {
-				return InternalJobStatus.CANCELED;
-			}
-			break;
-		case CANCELED:
-			LOG.error("Received update of execute state in job status CANCELED: " + eg.getJobID());
-			break;
-		case FINISHED:
-			LOG.error("Received update of execute state in job status FINISHED: " + eg.getJobID() + " "
-				+ StringUtils.stringifyException(new Throwable()));
-			break;
-		}
-
-		return currentJobStatus;
-	}
-
-	/**
-	 * Returns the current status of the job
-	 * represented by this execution graph.
-	 * 
-	 * @return the current status of the job
-	 */
-	public InternalJobStatus getJobStatus() {
-
-		return this.jobStatus.get();
-	}
-
-
-	@Override
-	public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
-			final ExecutionState newExecutionState, String optionalMessage) {
-
-		// Do not use the parameter newExecutionState here as it may already be out-dated
-
-		final ExecutionVertex vertex = getVertexByID(vertexID);
-		if (vertex == null) {
-			LOG.error("Cannot find execution vertex with the ID " + vertexID);
-			return;
-		}
-
-		final ExecutionState actualExecutionState = vertex.getExecutionState();
-
-		final InternalJobStatus newJobStatus = determineNewJobStatus(this, actualExecutionState);
-
-		if (actualExecutionState == ExecutionState.FINISHED) {
-			// It is worth checking if the current stage has complete
-			if (this.isCurrentStageCompleted()) {
-				// Increase current execution stage
-				++this.indexToCurrentExecutionStage;
-
-				if (this.indexToCurrentExecutionStage < this.stages.size()) {
-					final Iterator<ExecutionStageListener> it = this.executionStageListeners.iterator();
-					final ExecutionStage nextExecutionStage = getCurrentExecutionStage();
-					while (it.hasNext()) {
-						it.next().nextExecutionStageEntered(jobID, nextExecutionStage);
-					}
-				}
-			}
-		}
-
-		updateJobStatus(newJobStatus, optionalMessage);
-	}
-
-	/**
-	 * Updates the job status to given status and triggers the execution of the {@link JobStatusListener} objects.
-	 * 
-	 * @param newJobStatus
-	 *        the new job status
-	 * @param optionalMessage
-	 *        an optional message providing details on the reasons for the state change
-	 */
-	public void updateJobStatus(final InternalJobStatus newJobStatus, String optionalMessage) {
-
-		// Check if the new job status equals the old one
-		if (this.jobStatus.getAndSet(newJobStatus) == newJobStatus) {
-			return;
-		}
-
-		// The task caused the entire job to fail, save the error description
-		if (newJobStatus == InternalJobStatus.FAILING) {
-			this.errorDescription = optionalMessage;
-		}
-
-		// If this is the final failure state change, reuse the saved error description
-		if (newJobStatus == InternalJobStatus.FAILED) {
-			optionalMessage = this.errorDescription;
-		}
-
-		final Iterator<JobStatusListener> it = this.jobStatusListeners.iterator();
-		while (it.hasNext()) {
-			it.next().jobStatusHasChanged(this, newJobStatus, optionalMessage);
-		}
-	}
-
-	/**
-	 * Registers a new {@link JobStatusListener} object with this execution graph.
-	 * After being registered the object will receive notifications about changes
-	 * of the job status. It is not possible to register the same listener object
-	 * twice.
-	 * 
-	 * @param jobStatusListener
-	 *        the listener object to register
-	 */
-	public void registerJobStatusListener(final JobStatusListener jobStatusListener) {
-
-		if (jobStatusListener == null) {
-			throw new IllegalArgumentException("Argument jobStatusListener must not be null");
-		}
-
-		this.jobStatusListeners.addIfAbsent(jobStatusListener);
-	}
-
-	/**
-	 * Unregisters the given {@link JobStatusListener} object. After having called this
-	 * method, the object will no longer receive notifications about changes of the job
-	 * status.
-	 * 
-	 * @param jobStatusListener
-	 *        the listener object to unregister
-	 */
-	public void unregisterJobStatusListener(final JobStatusListener jobStatusListener) {
-
-		if (jobStatusListener == null) {
-			throw new IllegalArgumentException("Argument jobStatusListener must not be null");
-		}
-
-		this.jobStatusListeners.remove(jobStatusListener);
-	}
-
-	/**
-	 * Registers a new {@link ExecutionStageListener} object with this execution graph. After being registered the
-	 * object will receive a notification whenever the job has entered its next execution stage. Note that a
-	 * notification is not sent when the job has entered its initial execution stage.
-	 * 
-	 * @param executionStageListener
-	 *        the listener object to register
-	 */
-	public void registerExecutionStageListener(final ExecutionStageListener executionStageListener) {
-
-		if (executionStageListener == null) {
-			throw new IllegalArgumentException("Argument executionStageListener must not be null");
-		}
-
-		this.executionStageListeners.addIfAbsent(executionStageListener);
-	}
-
-	/**
-	 * Unregisters the given {@link ExecutionStageListener} object. After having called this method, the object will no
-	 * longer receiver notifications about the execution stage progress.
-	 * 
-	 * @param executionStageListener
-	 *        the listener object to unregister
-	 */
-	public void unregisterExecutionStageListener(final ExecutionStageListener executionStageListener) {
-
-		if (executionStageListener == null) {
-			throw new IllegalArgumentException("Argument executionStageListener must not be null");
-		}
-
-		this.executionStageListeners.remove(executionStageListener);
-	}
-
-	/**
-	 * Returns the name of the original job graph.
-	 * 
-	 * @return the name of the original job graph, possibly <code>null</code>
-	 */
-	public String getJobName() {
-		return this.jobName;
-	}
-
-	@Override
-	public void userThreadStarted(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {}
-
-	@Override
-	public void userThreadFinished(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {}
-
-	/**
-	 * Reconstructs the execution pipelines for the entire execution graph.
-	 */
-	private void reconstructExecutionPipelines() {
-		final Iterator<ExecutionStage> it = this.stages.iterator();
-		while (it.hasNext()) {
-
-			it.next().reconstructExecutionPipelines();
-		}
-	}
-
-	/**
-	 * Returns an iterator over all execution stages contained in this graph.
-	 * 
-	 * @return an iterator over all execution stages contained in this graph
-	 */
-	public Iterator<ExecutionStage> iterator() {
-		return this.stages.iterator();
-	}
-
-	/**
-	 * Performs an asynchronous update operation to this execution graph.
-	 * 
-	 * @param command
-	 *        the update command to be asynchronously executed on this graph
-	 */
-	public void executeCommand(final Runnable command) {
-		this.executorService.execute(command);
+		
+		this.executor.submit(action);
 	}
+}
 	
-	private void calculateConnectionIDs() {
-		final Set<ExecutionGroupVertex> alreadyVisited = new HashSet<ExecutionGroupVertex>();
-		final ExecutionStage lastStage = getStage(getNumberOfStages() - 1);
-
-		for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) {
-
-			final ExecutionGroupVertex groupVertex = lastStage.getStageMember(i);
-
-			int currentConnectionID = 0;
-
-			if (groupVertex.isOutputVertex()) {
-			currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited);
-			}
-		}
-	}
 	
-	/**
-	 * Retrieves the number of required slots to run this execution graph
-	 * @return
-	 */
-	public int getRequiredSlots(){
-		int maxRequiredSlots = 0;
-
-		final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
-
-		while(stageIterator.hasNext()){
-			final ExecutionStage stage = stageIterator.next();
-
-			int requiredSlots = stage.getRequiredSlots();
-
-			if(requiredSlots > maxRequiredSlots){
-				maxRequiredSlots = requiredSlots;
-			}
-		}
-
-		return maxRequiredSlots;
-	}
-}
+	
+	
+	
+	
+	
+	
+	
+	
+	
+	
+	
+	
+	
+	
+	
+//	
+//	/**
+//	 * Applies the user defined settings to the execution graph.
+//	 * 
+//	 * @param temporaryGroupVertexMap
+//	 *        mapping between job vertices and the corresponding group vertices.
+//	 * @throws GraphConversionException
+//	 *         thrown if an error occurs while applying the user settings.
+//	 */
+//	private void applyUserDefinedSettings(final HashMap<AbstractJobVertex, ExecutionGroupVertex> temporaryGroupVertexMap)
+//			throws GraphConversionException {
+//
+//		// The check for cycles in the dependency chain for instance sharing is already checked in
+//		// <code>submitJob</code> method of the job manager
+//
+//		// If there is no cycle, apply the settings to the corresponding group vertices
+//		final Iterator<Map.Entry<AbstractJobVertex, ExecutionGroupVertex>> it = temporaryGroupVertexMap.entrySet()
+//			.iterator();
+//		while (it.hasNext()) {
+//
+//			final Map.Entry<AbstractJobVertex, ExecutionGroupVertex> entry = it.next();
+//			final AbstractJobVertex jobVertex = entry.getKey();
+//			if (jobVertex.getVertexToShareInstancesWith() != null) {
+//
+//				final AbstractJobVertex vertexToShareInstancesWith = jobVertex.getVertexToShareInstancesWith();
+//				final ExecutionGroupVertex groupVertex = entry.getValue();
+//				final ExecutionGroupVertex groupVertexToShareInstancesWith = temporaryGroupVertexMap
+//					.get(vertexToShareInstancesWith);
+//				groupVertex.shareInstancesWith(groupVertexToShareInstancesWith);
+//			}
+//		}
+//
+//		// Second, we create the number of execution vertices each group vertex is supposed to manage
+//		Iterator<ExecutionGroupVertex> it2 = new ExecutionGroupVertexIterator(this, true, -1);
+//		while (it2.hasNext()) {
+//
+//			final ExecutionGroupVertex groupVertex = it2.next();
+//			if (groupVertex.isNumberOfMembersUserDefined()) {
+//				groupVertex.createInitialExecutionVertices(groupVertex.getUserDefinedNumberOfMembers());
+//			}
+//		}
+//
+//		// Finally, apply the channel settings channel settings
+//		it2 = new ExecutionGroupVertexIterator(this, true, -1);
+//		while (it2.hasNext()) {
+//
+//			final ExecutionGroupVertex groupVertex = it2.next();
+//			for (int i = 0; i < groupVertex.getNumberOfForwardLinks(); i++) {
+//
+//				final ExecutionGroupEdge edge = groupVertex.getForwardEdge(i);
+//				if (edge.isChannelTypeUserDefined()) {
+//					edge.changeChannelType(edge.getChannelType());
+//				}
+//
+//				// Create edges between execution vertices
+//				createExecutionEdgesForGroupEdge(edge);
+//			}
+//		}
+//
+//		// Repair the instance assignment after having changed the channel types
+//		repairInstanceAssignment();
+//
+//		// Repair the instance sharing among different group vertices
+//		repairInstanceSharing();
+//
+//		// Finally, repair the stages
+//		repairStages();
+//	}
+//
+//	/**
+//	 * Sets up an execution graph from a job graph.
+//	 * 
+//	 * @param jobGraph
+//	 *        the job graph to create the execution graph from
+//	 * @param defaultParallelism
+//	 *        defaultParallelism in case that nodes have no parallelism set
+//	 * @throws GraphConversionException
+//	 *         thrown if the job graph is not valid and no execution graph can be constructed from it
+//	 */
+//	private void constructExecutionGraph(final JobGraph jobGraph, final int defaultParallelism)
+//			throws GraphConversionException {
+//
+//		// Clean up temporary data structures
+//		final HashMap<AbstractJobVertex, ExecutionVertex> temporaryVertexMap = new HashMap<AbstractJobVertex, ExecutionVertex>();
+//		final HashMap<AbstractJobVertex, ExecutionGroupVertex> temporaryGroupVertexMap = new HashMap<AbstractJobVertex, ExecutionGroupVertex>();
+//
+//		// Initially, create only one execution stage that contains all group vertices
+//		final ExecutionStage initialExecutionStage = new ExecutionStage(this, 0);
+//		this.stages.add(initialExecutionStage);
+//
+//		// Convert job vertices to execution vertices and initialize them
+//		final AbstractJobVertex[] all = jobGraph.getAllJobVertices();
+//		for (int i = 0; i < all.length; i++) {
+//			if(all[i].getNumberOfSubtasks() == -1){
+//				all[i].setNumberOfSubtasks(defaultParallelism);
+//			}
+//
+//			final ExecutionVertex createdVertex = createVertex(all[i], initialExecutionStage);
+//			temporaryVertexMap.put(all[i], createdVertex);
+//			temporaryGroupVertexMap.put(all[i], createdVertex.getGroupVertex());
+//		}
+//
+//		// Create initial edges between the vertices
+//		createInitialGroupEdges(temporaryVertexMap);
+//
+//		// Now that an initial graph is built, apply the user settings
+//		applyUserDefinedSettings(temporaryGroupVertexMap);
+//
+//		// Calculate the connection IDs
+//		calculateConnectionIDs();
+//
+//		// Finally, construct the execution pipelines
+//		reconstructExecutionPipelines();
+//	}
+//
+//	private void createExecutionEdgesForGroupEdge(final ExecutionGroupEdge groupEdge) {
+//
+//		final ExecutionGroupVertex source = groupEdge.getSourceVertex();
+//		final int indexOfOutputGate = groupEdge.getIndexOfOutputGate();
+//		final ExecutionGroupVertex target = groupEdge.getTargetVertex();
+//		final int indexOfInputGate = groupEdge.getIndexOfInputGate();
+//
+//		final Map<GateID, List<ExecutionEdge>> inputChannelMap = new HashMap<GateID, List<ExecutionEdge>>();
+//
+//		// Unwire the respective gate of the source vertices
+//		final int currentNumberOfSourceNodes = source.getCurrentNumberOfGroupMembers();
+//		for (int i = 0; i < currentNumberOfSourceNodes; ++i) {
+//
+//			final ExecutionVertex sourceVertex = source.getGroupMember(i);
+//			final ExecutionGate outputGate = sourceVertex.getOutputGate(indexOfOutputGate);
+//			if (outputGate == null) {
+//				throw new IllegalStateException("wire: " + sourceVertex.getName()
+//					+ " has no output gate with index " + indexOfOutputGate);
+//			}
+//
+//			if (outputGate.getNumberOfEdges() > 0) {
+//				throw new IllegalStateException("wire: wire called on source " + sourceVertex.getName() + " (" + i
+//					+ "), but number of output channels is " + outputGate.getNumberOfEdges() + "!");
+//			}
+//
+//			final int currentNumberOfTargetNodes = target.getCurrentNumberOfGroupMembers();
+//			final List<ExecutionEdge> outputChannels = new ArrayList<ExecutionEdge>();
+//
+//			for (int j = 0; j < currentNumberOfTargetNodes; ++j) {
+//
+//				final ExecutionVertex targetVertex = target.getGroupMember(j);
+//				final ExecutionGate inputGate = targetVertex.getInputGate(indexOfInputGate);
+//				if (inputGate == null) {
+//					throw new IllegalStateException("wire: " + targetVertex.getName()
+//						+ " has no input gate with index " + indexOfInputGate);
+//				}
+//
+//				if (inputGate.getNumberOfEdges() > 0 && i == 0) {
+//					throw new IllegalStateException("wire: wire called on target " + targetVertex.getName() + " ("
+//						+ j + "), but number of input channels is " + inputGate.getNumberOfEdges() + "!");
+//				}
+//
+//				// Check if a wire is supposed to be created
+//				if (DistributionPatternProvider.createWire(groupEdge.getDistributionPattern(),
+//					i, j, currentNumberOfSourceNodes, currentNumberOfTargetNodes)) {
+//
+//					final ChannelID outputChannelID = new ChannelID();
+//					final ChannelID inputChannelID = new ChannelID();
+//
+//					final ExecutionEdge edge = new ExecutionEdge(outputGate, inputGate, groupEdge, outputChannelID,
+//						inputChannelID, outputGate.getNumberOfEdges(), inputGate.getNumberOfEdges());
+//
+//					this.edgeMap.put(outputChannelID, edge);
+//					this.edgeMap.put(inputChannelID, edge);
+//
+//					outputChannels.add(edge);
+//
+//					List<ExecutionEdge> inputChannels = inputChannelMap.get(inputGate.getGateID());
+//					if (inputChannels == null) {
+//						inputChannels = new ArrayList<ExecutionEdge>();
+//						inputChannelMap.put(inputGate.getGateID(), inputChannels);
+//					}
+//
+//					inputChannels.add(edge);
+//				}
+//			}
+//
+//			outputGate.replaceAllEdges(outputChannels);
+//		}
+//
+//		// Finally, set the channels for the input gates
+//		final int currentNumberOfTargetNodes = target.getCurrentNumberOfGroupMembers();
+//		for (int i = 0; i < currentNumberOfTargetNodes; ++i) {
+//
+//			final ExecutionVertex targetVertex = target.getGroupMember(i);
+//			final ExecutionGate inputGate = targetVertex.getInputGate(indexOfInputGate);
+//
+//			final List<ExecutionEdge> inputChannels = inputChannelMap.get(inputGate.getGateID());
+//			if (inputChannels == null) {
+//				LOG.error("Cannot find input channels for gate ID " + inputGate.getGateID());
+//				continue;
+//			}
+//
+//			inputGate.replaceAllEdges(inputChannels);
+//		}
+//
+//	}
+//
+//	/**
+//	 * Creates the initial edges between the group vertices
+//	 * 
+//	 * @param vertexMap
+//	 *        the temporary vertex map
+//	 * @throws GraphConversionException
+//	 *         if the initial wiring cannot be created
+//	 */
+//	private void createInitialGroupEdges(final HashMap<AbstractJobVertex, ExecutionVertex> vertexMap)
+//			throws GraphConversionException {
+//
+//		Iterator<Map.Entry<AbstractJobVertex, ExecutionVertex>> it = vertexMap.entrySet().iterator();
+//
+//		while (it.hasNext()) {
+//
+//			final Map.Entry<AbstractJobVertex, ExecutionVertex> entry = it.next();
+//			final AbstractJobVertex sjv = entry.getKey();
+//			final ExecutionVertex sev = entry.getValue();
+//			final ExecutionGroupVertex sgv = sev.getGroupVertex();
+//
+//			// First, build the group edges
+//			for (int i = 0; i < sjv.getNumberOfForwardConnections(); ++i) {
+//				final JobEdge edge = sjv.getForwardConnection(i);
+//				final AbstractJobVertex tjv = edge.getConnectedVertex();
+//
+//				final ExecutionVertex tev = vertexMap.get(tjv);
+//				final ExecutionGroupVertex tgv = tev.getGroupVertex();
+//				// Use NETWORK as default channel type if nothing else is defined by the user
+//				ChannelType channelType = edge.getChannelType();
+//				boolean userDefinedChannelType = true;
+//				if (channelType == null) {
+//					userDefinedChannelType = false;
+//					channelType = ChannelType.NETWORK;
+//				}
+//
+//				final DistributionPattern distributionPattern = edge.getDistributionPattern();
+//
+//				// Connect the corresponding group vertices and copy the user settings from the job edge
+//				final ExecutionGroupEdge groupEdge = sgv.wireTo(tgv, edge.getIndexOfInputGate(), i, channelType,
+//					userDefinedChannelType,distributionPattern);
+//
+//				final ExecutionGate outputGate = new ExecutionGate(new GateID(), sev, groupEdge, false);
+//				sev.insertOutputGate(i, outputGate);
+//				final ExecutionGate inputGate = new ExecutionGate(new GateID(), tev, groupEdge, true);
+//				tev.insertInputGate(edge.getIndexOfInputGate(), inputGate);
+//			}
+//		}
+//	}
+//
+//	/**
+//	 * Creates an execution vertex from a job vertex.
+//	 * 
+//	 * @param jobVertex
+//	 *        the job vertex to create the execution vertex from
+//	 * @param initialExecutionStage
+//	 *        the initial execution stage all group vertices are added to
+//	 * @return the new execution vertex
+//	 * @throws GraphConversionException
+//	 *         thrown if the job vertex is of an unknown subclass
+//	 */
+//	private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final ExecutionStage initialExecutionStage)
+//			throws GraphConversionException {
+//
+//		// Create an initial execution vertex for the job vertex
+//		final Class<? extends AbstractInvokable> invokableClass = jobVertex.getInvokableClass();
+//		if (invokableClass == null) {
+//			throw new GraphConversionException("JobVertex " + jobVertex.getID() + " (" + jobVertex.getName()
+//				+ ") does not specify a task");
+//		}
+//
+//		// Calculate the cryptographic signature of this vertex
+//		final ExecutionSignature signature = ExecutionSignature.createSignature(jobVertex.getInvokableClass(),
+//			jobVertex.getJobGraph().getJobID());
+//
+//		// Create a group vertex for the job vertex
+//
+//		ExecutionGroupVertex groupVertex = null;
+//		try {
+//			groupVertex = new ExecutionGroupVertex(jobVertex.getName(), jobVertex.getID(), this,
+//				jobVertex.getNumberOfSubtasks(), jobVertex.getVertexToShareInstancesWith() != null ? true
+//					: false, 0, jobVertex.getConfiguration(), signature,
+//				invokableClass);
+//		} catch (Throwable t) {
+//			throw new GraphConversionException(t);
+//		}
+//
+//		// Register input and output vertices separately
+//		if (jobVertex instanceof AbstractJobInputVertex) {
+//
+//			final AbstractJobInputVertex jobInputVertex = (AbstractJobInputVertex) jobVertex;
+//			
+//			if (jobVertex instanceof InputFormatVertex) {
+//				try {
+//					// get a handle to the user code class loader
+//					ClassLoader cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
+//					
+//					((InputFormatVertex) jobVertex).initializeInputFormatFromTaskConfig(cl);
+//				}
+//				catch (Throwable t) {
+//					throw new GraphConversionException("Could not deserialize input format.", t);
+//				}
+//			}
+//			
+//			final Class<? extends InputSplit> inputSplitType = jobInputVertex.getInputSplitType();
+//			
+//			InputSplit[] inputSplits;
+//
+//			try {
+//				inputSplits = jobInputVertex.getInputSplits(jobVertex.getNumberOfSubtasks());
+//			}
+//			catch (Throwable t) {
+//				throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName(), t);
+//			}
+//
+//			if (inputSplits == null) {
+//				inputSplits = new InputSplit[0];
+//			}
+//			
+//			LOG.info("Job input vertex " + jobVertex.getName() + " generated " + inputSplits.length + " input splits");
+//
+//			// assign input splits and type
+//			groupVertex.setInputSplits(inputSplits);
+//			groupVertex.setInputSplitType(inputSplitType);
+//		}
+//
+//		if (jobVertex instanceof OutputFormatVertex){
+//			final OutputFormatVertex jobOutputVertex = (OutputFormatVertex) jobVertex;
+//			
+//			try {
+//				// get a handle to the user code class loader
+//				ClassLoader cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
+//				jobOutputVertex.initializeOutputFormatFromTaskConfig(cl);
+//			}
+//			catch (Throwable t) {
+//				throw new GraphConversionException("Could not deserialize output format.", t);
+//			}
+//
+//			OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
+//			if (outputFormat != null && outputFormat instanceof InitializeOnMaster){
+//				try {
+//					((InitializeOnMaster) outputFormat).initializeGlobal(jobVertex.getNumberOfSubtasks());
+//				}
+//				catch (Throwable t) {
+//					throw new GraphConversionException(t);
+//				}
+//			}
+//		}
+//
+//		// Add group vertex to initial execution stage
+//		initialExecutionStage.addStageMember(groupVertex);
+//
+//		final ExecutionVertex ev = new ExecutionVertex(this, groupVertex, jobVertex.getNumberOfForwardConnections(),
+//			jobVertex.getNumberOfBackwardConnections());
+//
+//		// Assign initial instance to vertex (may be overwritten later on when user settings are applied)
+//		ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(), null));
+//
+//		return ev;
+//	}
+//
+//	/**
+//	 * Returns the number of input vertices registered with this execution graph.
+//	 * 
+//	 * @return the number of input vertices registered with this execution graph
+//	 */
+//	public int getNumberOfInputVertices() {
+//		return this.stages.get(0).getNumberOfInputExecutionVertices();
+//	}
+//
+//	/**
+//	 * Returns the number of input vertices for the given stage.
+//	 * 
+//	 * @param stage
+//	 *        the index of the execution stage
+//	 * @return the number of input vertices for the given stage
+//	 */
+//	public int getNumberOfInputVertices(int stage) {
+//		if (stage >= this.stages.size()) {
+//			return 0;
+//		}
+//
+//		return this.stages.get(stage).getNumberOfInputExecutionVertices();
+//	}
+//
+//	/**
+//	 * Returns the number of output vertices registered with this execution graph.
+//	 * 
+//	 * @return the number of output vertices registered with this execution graph
+//	 */
+//	public int getNumberOfOutputVertices() {
+//		return this.stages.get(0).getNumberOfOutputExecutionVertices();
+//	}
+//
+//	/**
+//	 * Returns the number of output vertices for the given stage.
+//	 * 
+//	 * @param stage
+//	 *        the index of the execution stage
+//	 * @return the number of input vertices for the given stage
+//	 */
+//	public int getNumberOfOutputVertices(final int stage) {
+//		if (stage >= this.stages.size()) {
+//			return 0;
+//		}
+//
+//		return this.stages.get(stage).getNumberOfOutputExecutionVertices();
+//	}
+//
+//	/**
+//	 * Returns the input vertex with the specified index.
+//	 * 
+//	 * @param index
+//	 *        the index of the input vertex to return
+//	 * @return the input vertex with the specified index or <code>null</code> if no input vertex with such an index
+//	 *         exists
+//	 */
+//	public ExecutionVertex getInputVertex(final int index) {
+//		return this.stages.get(0).getInputExecutionVertex(index);
+//	}
+//
+//	/**
+//	 * Returns the output vertex with the specified index.
+//	 * 
+//	 * @param index
+//	 *        the index of the output vertex to return
+//	 * @return the output vertex with the specified index or <code>null</code> if no output vertex with such an index
+//	 *         exists
+//	 */
+//	public ExecutionVertex getOutputVertex(final int index) {
+//		return this.stages.get(0).getOutputExecutionVertex(index);
+//	}
+//
+//	/**
+//	 * Returns the input vertex with the specified index for the given stage
+//	 * 
+//	 * @param stage
+//	 *        the index of the stage
+//	 * @param index
+//	 *        the index of the input vertex to return
+//	 * @return the input vertex with the specified index or <code>null</code> if no input vertex with such an index
+//	 *         exists in that stage
+//	 */
+//	public ExecutionVertex getInputVertex(final int stage, final int index) {
+//		try {
+//			final ExecutionStage s = this.stages.get(stage);
+//			if (s == null) {
+//				return null;
+//			}
+//
+//			return s.getInputExecutionVertex(index);
+//
+//		} catch (ArrayIndexOutOfBoundsException e) {
+//			return null;
+//		}
+//	}
+//
+//	/**
+//	 * Returns the output vertex with the specified index for the given stage.
+//	 * 
+//	 * @param stage
+//	 *        the index of the stage
+//	 * @param index
+//	 *        the index of the output vertex to return
+//	 * @return the output vertex with the specified index or <code>null</code> if no output vertex with such an index
+//	 *         exists in that stage
+//	 */
+//	public ExecutionVertex getOutputVertex(final int stage, final int index) {
+//		try {
+//			final ExecutionStage s = this.stages.get(stage);
+//			if (s == null) {
+//				return null;
+//			}
+//
+//			return s.getOutputExecutionVertex(index);
+//
+//		} catch (ArrayIndexOutOfBoundsException e) {
+//			return null;
+//		}
+//	}
+//
+//	/**
+//	 * Identifies an execution by the specified channel ID and returns it.
+//	 * 
+//	 * @param id
+//	 *        the channel ID to identify the vertex with
+//	 * @return the execution vertex which has a channel with ID <code>id</code> or <code>null</code> if no such vertex
+//	 *         exists in the execution graph
+//	 */
+//	public ExecutionVertex getVertexByChannelID(final ChannelID id) {
+//		final ExecutionEdge edge = this.edgeMap.get(id);
+//		if (edge == null) {
+//			return null;
+//		}
+//
+//		if (id.equals(edge.getOutputChannelID())) {
+//			return edge.getOutputGate().getVertex();
+//		}
+//
+//		return edge.getInputGate().getVertex();
+//	}
+//
+//	/**
+//	 * Finds an {@link ExecutionEdge} by its ID and returns it.
+//	 * 
+//	 * @param id
+//	 *        the channel ID to identify the edge
+//	 * @return the edge whose ID matches <code>id</code> or <code>null</code> if no such edge is known
+//	 */
+//	public ExecutionEdge getEdgeByID(final ChannelID id) {
+//		return this.edgeMap.get(id);
+//	}
+//
+//	/**
+//	 * Registers an execution vertex with the execution graph.
+//	 * 
+//	 * @param vertex
+//	 *        the execution vertex to register
+//	 */
+//	void registerExecutionVertex(final ExecutionVertex vertex) {
+//		if (this.vertexMap.put(vertex.getID(), vertex) != null) {
+//			throw new IllegalStateException("There is already an execution vertex with ID " + vertex.getID()
+//				+ " registered");
+//		}
+//	}
+//
+//	/**
+//	 * Returns the execution vertex with the given vertex ID.
+//	 * 
+//	 * @param id
+//	 *        the vertex ID to retrieve the execution vertex
+//	 * @return the execution vertex matching the provided vertex ID or <code>null</code> if no such vertex could be
+//	 *         found
+//	 */
+//	public ExecutionVertex getVertexByID(final ExecutionVertexID id) {
+//		return this.vertexMap.get(id);
+//	}
+//
+//	/**
+//	 * Checks if the current execution stage has been successfully completed, i.e.
+//	 * all vertices in this stage have successfully finished their execution.
+//	 * 
+//	 * @return <code>true</code> if stage is completed, <code>false</code> otherwise
+//	 */
+//	private boolean isCurrentStageCompleted() {
+//		if (this.indexToCurrentExecutionStage >= this.stages.size()) {
+//			return true;
+//		}
+//
+//		final ExecutionGraphIterator it = new ExecutionGraphIterator(this, this.indexToCurrentExecutionStage, true,
+//			true);
+//		while (it.hasNext()) {
+//			final ExecutionVertex vertex = it.next();
+//			if (vertex.getExecutionState() != ExecutionState.FINISHED) {
+//				return false;
+//			}
+//		}
+//
+//		return true;
+//	}
+//
+//	/**
+//	 * Checks if the execution of execution graph is finished.
+//	 * 
+//	 * @return <code>true</code> if the execution of the graph is finished, <code>false</code> otherwise
+//	 */
+//	public boolean isExecutionFinished() {
+//		return (getJobStatus() == InternalJobStatus.FINISHED);
+//	}
+//
+//	/**
+//	 * Returns the job ID of the job configuration this execution graph was originally constructed from.
+//	 * 
+//	 * @return the job ID of the job configuration this execution graph was originally constructed from
+//	 */
+//	public JobID getJobID() {
+//		return this.jobID;
+//	}
+//
+//	/**
+//	 * Returns the index of the current execution stage.
+//	 * 
+//	 * @return the index of the current execution stage
+//	 */
+//	public int getIndexOfCurrentExecutionStage() {
+//		return this.indexToCurrentExecutionStage;
+//	}
+//
+//	/**
+//	 * Returns the stage which is currently executed.
+//	 * 
+//	 * @return the currently executed stage or <code>null</code> if the job execution is already completed
+//	 */
+//	public ExecutionStage getCurrentExecutionStage() {
+//
+//		try {
+//			return this.stages.get(this.indexToCurrentExecutionStage);
+//		} catch (ArrayIndexOutOfBoundsException e) {
+//			return null;
+//		}
+//	}
+//
+//	public void repairStages() {
+//
+//		final Map<ExecutionGroupVertex, Integer> stageNumbers = new HashMap<ExecutionGroupVertex, Integer>();
+//		ExecutionGroupVertexIterator it = new ExecutionGroupVertexIterator(this, true, -1);
+//
+//		while (it.hasNext()) {
+//
+//			final ExecutionGroupVertex groupVertex = it.next();
+//			int precedingNumber = 0;
+//			if (stageNumbers.containsKey(groupVertex)) {
+//				precedingNumber = stageNumbers.get(groupVertex).intValue();
+//			} else {
+//				stageNumbers.put(groupVertex, Integer.valueOf(precedingNumber));
+//			}
+//
+//			for (int i = 0; i < groupVertex.getNumberOfForwardLinks(); i++) {
+//
+//				final ExecutionGroupEdge edge = groupVertex.getForwardEdge(i);
+//				if (!stageNumbers.containsKey(edge.getTargetVertex())) {
+//					// Target vertex has not yet been discovered
+//					// Same stage as preceding vertex
+//					stageNumbers.put(edge.getTargetVertex(), Integer.valueOf(precedingNumber));
+//				} else {
+//					final int stageNumber = stageNumbers.get(edge.getTargetVertex()).intValue();
+//					if (stageNumber != precedingNumber) {
+//						stageNumbers.put(edge.getTargetVertex(), (int) Math.max(precedingNumber, stageNumber));
+//					}
+//				}
+//			}
+//		}
+//
+//		// Traverse the graph backwards (starting from the output vertices) to make sure vertices are allocated in a
+//		// stage as high as possible
+//		it = new ExecutionGroupVertexIterator(this, false, -1);
+//
+//		while (it.hasNext()) {
+//
+//			final ExecutionGroupVertex groupVertex = it.next();
+//			final int succeedingNumber = stageNumbers.get(groupVertex);
+//
+//			for (int i = 0; i < groupVertex.getNumberOfBackwardLinks(); i++) {
+//
+//				final ExecutionGroupEdge edge = groupVertex.getBackwardEdge(i);
+//				final int stageNumber = stageNumbers.get(edge.getSourceVertex());
+//				if (stageNumber != succeedingNumber) {
+//					throw new IllegalStateException(edge.getSourceVertex() + " and " + edge.getTargetVertex()
+//						+ " are assigned to different stages");
+//				}
+//			}
+//		}
+//
+//		// Finally, assign the new stage numbers
+//		this.stages.clear();
+//		final Iterator<Map.Entry<ExecutionGroupVertex, Integer>> it2 = stageNumbers.entrySet().iterator();
+//		while (it2.hasNext()) {
+//
+//			final Map.Entry<ExecutionGroupVertex, Integer> entry = it2.next();
+//			final ExecutionGroupVertex groupVertex = entry.getKey();
+//			final int stageNumber = entry.getValue().intValue();
+//			// Prevent out of bounds exceptions
+//			while (this.stages.size() <= stageNumber) {
+//				this.stages.add(null);
+//			}
+//			ExecutionStage executionStage = this.stages.get(stageNumber);
+//			// If the stage not yet exists,
+//			if (executionStage == null) {
+//				executionStage = new ExecutionStage(this, stageNumber);
+//				this.stages.set(stageNumber, executionStage);
+//			}
+//
+//			executionStage.addStageMember(groupVertex);
+//			groupVertex.setExecutionStage(executionStage);
+//		}
+//	}
+//
+//	public void repairInstanceSharing() {
+//
+//		final Set<AllocatedResource> availableResources = new LinkedHashSet<AllocatedResource>();
+//
+//		final Iterator<ExecutionGroupVertex> it = new ExecutionGroupVertexIterator(this, true, -1);
+//		while (it.hasNext()) {
+//			final ExecutionGroupVertex groupVertex = it.next();
+//			if (groupVertex.getVertexToShareInstancesWith() == null) {
+//				availableResources.clear();
+//				groupVertex.repairInstanceSharing(availableResources);
+//			}
+//		}
+//	}
+//
+//	public void repairInstanceAssignment() {
+//
+//		Iterator<ExecutionVertex> it = new ExecutionGraphIterator(this, true);
+//		while (it.hasNext()) {
+//
+//			final ExecutionVertex sourceVertex = it.next();
+//
+//			for (int i = 0; i < sourceVertex.getNumberOfOutputGates(); ++i) {
+//
+//				final ExecutionGate outputGate = sourceVertex.getOutputGate(i);
+//				final ChannelType channelType = outputGate.getChannelType();
+//				if (channelType == ChannelType.IN_MEMORY) {
+//					final int numberOfOutputChannels = outputGate.getNumberOfEdges();
+//					for (int j = 0; j < numberOfOutputChannels; ++j) {
+//						final ExecutionEdge outputChannel = outputGate.getEdge(j);
+//						outputChannel.getInputGate().getVertex()
+//							.setAllocatedResource(sourceVertex.getAllocatedResource());
+//					}
+//				}
+//			}
+//		}
+//
+//		it = new ExecutionGraphIterator(this, false);
+//		while (it.hasNext()) {
+//
+//			final ExecutionVertex targetVertex = it.next();
+//
+//			for (int i = 0; i < targetVertex.getNumberOfInputGates(); ++i) {
+//
+//				final ExecutionGate inputGate = targetVertex.getInputGate(i);
+//				final ChannelType channelType = inputGate.getChannelType();
+//				if (channelType == ChannelType.IN_MEMORY) {
+//					final int numberOfInputChannels = inputGate.getNumberOfEdges();
+//					for (int j = 0; j < numberOfInputChannels; ++j) {
+//						final ExecutionEdge inputChannel = inputGate.getEdge(j);
+//						inputChannel.getOutputGate().getVertex()
+//							.setAllocatedResource(targetVertex.getAllocatedResource());
+//					}
+//				}
+//			}
+//		}
+//	}
+//
+//	public ChannelType getChannelType(final ExecutionVertex sourceVertex, final ExecutionVertex targetVertex) {
+//
+//		final ExecutionGroupVertex sourceGroupVertex = sourceVertex.getGroupVertex();
+//		final ExecutionGroupVertex targetGroupVertex = targetVertex.getGroupVertex();
+//
+//		final List<ExecutionGroupEdge> edges = sourceGroupVertex.getForwardEdges(targetGroupVertex);
+//		if (edges.size() == 0) {
+//			return null;
+//		}
+//
+//		// On a task level, the two vertices are connected
+//		final ExecutionGroupEdge edge = edges.get(0);
+//
+//		// Now lets see if these two concrete subtasks are connected
+//		final ExecutionGate outputGate = sourceVertex.getOutputGate(edge.getIndexOfOutputGate());
+//		for (int i = 0; i < outputGate.getNumberOfEdges(); ++i) {
+//
+//			final ExecutionEdge outputChannel = outputGate.getEdge(i);
+//			if (targetVertex == outputChannel.getInputGate().getVertex()) {
+//				return edge.getChannelType();
+//			}
+//		}
+//
+//		return null;
+//	}
+//
+//	/**
+//	 * Returns the job configuration that was originally attached to the job graph.
+//	 * 
+//	 * @return the job configuration that was originally attached to the job graph
+//	 */
+//	public Configuration getJobConfiguration() {
+//		return this.jobConfiguration;
+//	}
+//
+//	/**
+//	 * Checks whether the job represented by the execution graph has the status <code>FINISHED</code>.
+//	 * 
+//	 * @return <code>true</code> if the job has the status <code>CREATED</code>, <code>false</code> otherwise
+//	 */
+//	private boolean jobHasFinishedStatus() {
+//
+//		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(this, true);
+//
+//		while (i

<TRUNCATED>

Mime
View raw message