flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/9] flink git commit: [FLINK-2111] Add "stop" signal to cleanly shutdown streaming jobs
Date Mon, 15 Feb 2016 15:26:00 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
index ab61ee7..c7dc0bc 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
@@ -33,6 +33,7 @@ limitations under the License.
       {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div>
   <div ng-if="job.duration &gt; -1" title="{{job.duration | humanizeDuration:false}}" class="navbar-info last first">{{job.duration | humanizeDuration:true}}</div>
   <div ng-if="job.state=='RUNNING' || job.state=='CREATED' || job.state=='RESTARTING'" class="navbar-info last first"><span ng-click="cancelJob($event)" class="navbar-info-button btn btn-default">Cancel</span></div>
+  <div ng-if="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' || job.state=='RESTARTING')" class="navbar-info last first"><span ng-click="stopJob($event)" class="navbar-info-button btn btn-default">Stop</span></div>
 </nav>
 <nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main-additional">
   <ul class="nav nav-tabs">

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java
new file mode 100644
index 0000000..6bb71ce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+/**
+ * Indicates that a job is not stoppable.
+ */
+public class StoppingException extends Exception {
+
+	private static final long serialVersionUID = -721315728140810694L;
+
+	public StoppingException(String msg) {
+		super(msg);
+	}
+
+	public StoppingException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index db037bb..bc75664 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -77,6 +77,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 import static org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import static org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import static org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import static org.apache.flink.runtime.messages.TaskMessages.UpdatePartitionInfo;
 import static org.apache.flink.runtime.messages.TaskMessages.UpdateTaskSinglePartitionInfo;
@@ -106,11 +107,13 @@ public class Execution implements Serializable {
 
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
-	
+
 	private static final Logger LOG = ExecutionGraph.LOG;
-	
+
 	private static final int NUM_CANCEL_CALL_TRIES = 3;
 
+	private static final int NUM_STOP_CALL_TRIES = 3;
+
 	// --------------------------------------------------------------------------------------------
 
 	private final ExecutionVertex vertex;
@@ -126,13 +129,13 @@ public class Execution implements Serializable {
 	private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
 	private volatile ExecutionState state = CREATED;
-	
+
 	private volatile SimpleSlot assignedResource;     // once assigned, never changes until the execution is archived
-	
+
 	private volatile Throwable failureCause;          // once assigned, never changes
-	
+
 	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
-	
+
 	private SerializedValue<StateHandle<?>> operatorState;
 	
 	private long recoveryTimestamp;
@@ -162,7 +165,7 @@ public class Execution implements Serializable {
 
 		this.vertex = checkNotNull(vertex);
 		this.attemptId = new ExecutionAttemptID();
-		
+
 		this.attemptNumber = attemptNumber;
 
 		this.stateTimestamps = new long[ExecutionState.values().length];
@@ -172,7 +175,7 @@ public class Execution implements Serializable {
 
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor>();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//   Properties
 	// --------------------------------------------------------------------------------------------
@@ -200,23 +203,23 @@ public class Execution implements Serializable {
 	public InstanceConnectionInfo getAssignedResourceLocation() {
 		return assignedResourceLocation;
 	}
-	
+
 	public Throwable getFailureCause() {
 		return failureCause;
 	}
-	
+
 	public long[] getStateTimestamps() {
 		return stateTimestamps;
 	}
-	
+
 	public long getStateTimestamp(ExecutionState state) {
 		return this.stateTimestamps[state.ordinal()];
 	}
-	
+
 	public boolean isFinished() {
 		return state == FINISHED || state == FAILED || state == CANCELED;
 	}
-	
+
 	/**
 	 * This method cleans fields that are irrelevant for the archived execution attempt.
 	 */
@@ -231,7 +234,7 @@ public class Execution implements Serializable {
 		partialInputChannelDeploymentDescriptors.clear();
 		partialInputChannelDeploymentDescriptors = null;
 	}
-	
+
 	public void setInitialState(SerializedValue<StateHandle<?>> initialState, long recoveryTimestamp) {
 		if (state != ExecutionState.CREATED) {
 			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
@@ -239,11 +242,11 @@ public class Execution implements Serializable {
 		this.operatorState = initialState;
 		this.recoveryTimestamp = recoveryTimestamp;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
 	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
@@ -355,14 +358,14 @@ public class Execution implements Serializable {
 				slot.releaseSlot();
 				return;
 			}
-			
+
 			if (LOG.isInfoEnabled()) {
 				LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
 						attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname()));
 			}
 
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, recoveryTimestamp, attemptNumber);
-			
+
 			// register this execution at the execution graph, to receive call backs
 			vertex.getExecutionGraph().registerExecution(this);
 
@@ -378,10 +381,10 @@ public class Execution implements Serializable {
 					if (failure != null) {
 						if (failure instanceof TimeoutException) {
 							String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
-							
+
 							markFailed(new Exception(
 									"Cannot deploy task " + taskname + " - TaskManager (" + instance
-											+ ") not responding after a timeout of " + timeout, failure));
+									+ ") not responding after a timeout of " + timeout, failure));
 						}
 						else {
 							markFailed(failure);
@@ -402,21 +405,53 @@ public class Execution implements Serializable {
 		}
 	}
 
+	/**
+	 * Sends stop RPC call.
+	 */
+	public void stop() {
+		final SimpleSlot slot = this.assignedResource;
+
+		if (slot != null) {
+			final ActorGateway gateway = slot.getInstance().getActorGateway();
+
+			Future<Object> stopResult = gateway.retry(
+				new StopTask(attemptId),
+				NUM_STOP_CALL_TRIES,
+				timeout,
+				executionContext);
+
+			stopResult.onComplete(new OnComplete<Object>() {
+				@Override
+				public void onComplete(Throwable failure, Object success) throws Throwable {
+					if (failure != null) {
+						fail(new Exception("Task could not be stopped.", failure));
+					} else {
+						TaskOperationResult result = (TaskOperationResult) success;
+						if (!result.success()) {
+							LOG.info("Stopping task was not successful. Description: {}",
+									result.description());
+						}
+					}
+				}
+			}, executionContext);
+		}
+	}
+
 	public void cancel() {
 		// depending on the previous state, we go directly to cancelled (no cancel call necessary)
 		// -- or to canceling (cancel call needs to be sent to the task manager)
-		
+
 		// because of several possibly previous states, we need to again loop until we make a
 		// successful atomic state transition
 		while (true) {
-			
+
 			ExecutionState current = this.state;
-			
+
 			if (current == CANCELING || current == CANCELED) {
 				// already taken care of, no need to cancel again
 				return;
 			}
-				
+
 			// these two are the common cases where we need to send a cancel call
 			else if (current == RUNNING || current == DEPLOYING) {
 				// try to transition to canceling, if successful, send the cancel call
@@ -426,7 +461,7 @@ public class Execution implements Serializable {
 				}
 				// else: fall through the loop
 			}
-			
+
 			else if (current == FINISHED || current == FAILED) {
 				// nothing to do any more. finished failed before it could be cancelled.
 				// in any case, the task is removed from the TaskManager already
@@ -437,10 +472,10 @@ public class Execution implements Serializable {
 			else if (current == CREATED || current == SCHEDULED) {
 				// from here, we can directly switch to cancelled, because no task has been deployed
 				if (transitionState(current, CANCELED)) {
-					
+
 					// we skip the canceling state. set the timestamp, for a consistent appearance
 					markTimestamp(CANCELING, getStateTimestamp(CANCELED));
-					
+
 					try {
 						vertex.getExecutionGraph().deregisterExecution(this);
 						if (assignedResource != null) {
@@ -745,7 +780,7 @@ public class Execution implements Serializable {
 		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
 
 		// we may need to loop multiple times (in the presence of concurrent calls) in order to
-		// atomically switch to failed 
+		// atomically switch to failed
 		while (true) {
 			ExecutionState current = this.state;
 
@@ -775,7 +810,7 @@ public class Execution implements Serializable {
 				finally {
 					vertex.executionFailed(t);
 				}
-				
+
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 20288fb..7d83ae2 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
@@ -86,6 +87,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 /**
  * The execution graph is the central data structure that coordinates the distributed
  * execution of a data flow. It keeps representations of each parallel task, each
@@ -122,7 +124,7 @@ public class ExecutionGraph implements Serializable {
 
 	/** The log object used for debugging. */
 	static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/** The lock used to secure all access to mutable fields, especially the tracking of progress
@@ -143,12 +145,15 @@ public class ExecutionGraph implements Serializable {
 	/** The job configuration that was originally attached to the JobGraph. */
 	private final Configuration jobConfiguration;
 
+	/** {@code true} if all source tasks are stoppable. */
+	private boolean isStoppable = true;
+
 	/** All job vertices that are part of this graph */
 	private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
 
 	/** All vertices, in the order in which they were created **/
 	private final List<ExecutionJobVertex> verticesInCreationOrder;
-	
+
 	/** All intermediate results that are part of this graph */
 	private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
 
@@ -204,7 +209,7 @@ public class ExecutionGraph implements Serializable {
 
 	/** The number of job vertices that have reached a terminal state */
 	private volatile int numFinishedJobVertices;
-	
+
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
 	/** The scheduler to use for scheduling new tasks as they are needed */
@@ -218,7 +223,7 @@ public class ExecutionGraph implements Serializable {
 	/** The classloader for the user code. Needed for calls into user code classes */
 	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private ClassLoader userClassLoader;
-	
+
 	/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
 	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private CheckpointCoordinator checkpointCoordinator;
@@ -277,9 +282,11 @@ public class ExecutionGraph implements Serializable {
 			List<URL> requiredClasspaths,
 			ClassLoader userClassLoader) {
 
-		if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
-			throw new NullPointerException();
-		}
+		checkNotNull(executionContext);
+		checkNotNull(jobId);
+		checkNotNull(jobName);
+		checkNotNull(jobConfig);
+		checkNotNull(userClassLoader);
 
 		this.executionContext = executionContext;
 
@@ -308,7 +315,7 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Configuration of Data-flow wide execution settings  
+	//  Configuration of Data-flow wide execution settings
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -338,7 +345,7 @@ public class ExecutionGraph implements Serializable {
 	public boolean isArchived() {
 		return isArchived;
 	}
-	
+
 	public void enableSnapshotCheckpointing(
 			long interval,
 			long checkpointTimeout,
@@ -365,7 +372,7 @@ public class ExecutionGraph implements Serializable {
 		ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
 		ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
 		ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
-		
+
 		// disable to make sure existing checkpoint coordinators are cleared
 		disableSnaphotCheckpointing();
 
@@ -399,7 +406,7 @@ public class ExecutionGraph implements Serializable {
 				completedCheckpointStore,
 				recoveryMode,
 				checkpointStatsTracker);
-		
+
 		// the periodic checkpoint scheduler is activated and deactivated as a result of
 		// job status changes (running -> on, all other states -> off)
 		registerJobStatusListener(
@@ -435,7 +442,7 @@ public class ExecutionGraph implements Serializable {
 		if (state != JobStatus.CREATED) {
 			throw new IllegalStateException("Job must be in CREATED state");
 		}
-		
+
 		if (checkpointCoordinator != null) {
 			checkpointCoordinator.shutdown();
 			checkpointCoordinator = null;
@@ -485,9 +492,9 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Properties and Status of the Execution Graph  
+	//  Properties and Status of the Execution Graph
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns a list of BLOB keys referring to the JAR files required to run this job
 	 * @return list of BLOB keys referring to the JAR files required to run this job
@@ -530,6 +537,10 @@ public class ExecutionGraph implements Serializable {
 		return jobName;
 	}
 
+	public boolean isStoppable() {
+		return this.isStoppable;
+	}
+
 	public Configuration getJobConfiguration() {
 		return jobConfiguration;
 	}
@@ -558,7 +569,7 @@ public class ExecutionGraph implements Serializable {
 		// we return a specific iterator that does not fail with concurrent modifications
 		// the list is append only, so it is safe for that
 		final int numElements = this.verticesInCreationOrder.size();
-		
+
 		return new Iterable<ExecutionJobVertex>() {
 			@Override
 			public Iterator<ExecutionJobVertex> iterator() {
@@ -688,6 +699,10 @@ public class ExecutionGraph implements Serializable {
 
 		for (JobVertex jobVertex : topologiallySorted) {
 
+			if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
+				this.isStoppable = false;
+			}
+
 			// create the execution job vertex and attach it to the graph
 			ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
 			ejv.connectToPredecessors(this.intermediateResults);
@@ -705,20 +720,20 @@ public class ExecutionGraph implements Serializable {
 							res.getId(), res, previousDataSet));
 				}
 			}
-			
+
 			this.verticesInCreationOrder.add(ejv);
 		}
 	}
-	
+
 	public void scheduleForExecution(Scheduler scheduler) throws JobException {
 		if (scheduler == null) {
 			throw new IllegalArgumentException("Scheduler must not be null.");
 		}
-		
+
 		if (this.scheduler != null && this.scheduler != scheduler) {
 			throw new IllegalArgumentException("Cannot use different schedulers for the same job");
 		}
-		
+
 		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
 			this.scheduler = scheduler;
 
@@ -754,7 +769,7 @@ public class ExecutionGraph implements Serializable {
 	public void cancel() {
 		while (true) {
 			JobStatus current = state;
-			
+
 			if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
 				if (transitionState(current, JobStatus.CANCELLING)) {
 					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
@@ -790,6 +805,18 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
+	public void stop() throws StoppingException {
+		if(this.isStoppable) {
+			for(ExecutionVertex ev : this.getAllExecutionVertices()) {
+				if(ev.getNumberOfInputs() == 0) { // send signal to sources only
+					ev.stop();
+				}
+			}
+		} else {
+			throw new StoppingException("This job is not stoppable.");
+		}
+	}
+
 	public void fail(Throwable t) {
 		while (true) {
 			JobStatus current = state;
@@ -808,10 +835,10 @@ public class ExecutionGraph implements Serializable {
 					// set the state of the job to failed
 					transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
 				}
-				
+
 				return;
 			}
-			
+
 			// no need to treat other states
 		}
 	}
@@ -836,15 +863,15 @@ public class ExecutionGraph implements Serializable {
 				this.currentExecutions.clear();
 
 				Collection<CoLocationGroup> colGroups = new HashSet<>();
-				
+
 				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
-					
+
 					CoLocationGroup cgroup = jv.getCoLocationGroup();
 					if(cgroup != null && !colGroups.contains(cgroup)){
 						cgroup.resetConstraints();
 						colGroups.add(cgroup);
 					}
-					
+
 					jv.resetForNewExecution();
 				}
 
@@ -853,7 +880,7 @@ public class ExecutionGraph implements Serializable {
 				}
 				numFinishedJobVertices = 0;
 				transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
-				
+
 				// if we have checkpointed state, reload it into the executions
 				if (checkpointCoordinator != null) {
 					checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
@@ -962,7 +989,7 @@ public class ExecutionGraph implements Serializable {
 			}
 		}
 	}
-	
+
 	private boolean transitionState(JobStatus current, JobStatus newState) {
 		return transitionState(current, newState, null);
 	}
@@ -989,14 +1016,14 @@ public class ExecutionGraph implements Serializable {
 			}
 
 			numFinishedJobVertices++;
-			
+
 			if (numFinishedJobVertices == verticesInCreationOrder.size()) {
-				
+
 				// we are done, transition to the final state
 				JobStatus current;
 				while (true) {
 					current = this.state;
-					
+
 					if (current == JobStatus.RUNNING) {
 						if (transitionState(current, JobStatus.FINISHED)) {
 							postRunCleanup();
@@ -1066,7 +1093,7 @@ public class ExecutionGraph implements Serializable {
 
 	/**
 	 * Updates the state of one of the ExecutionVertex's Execution attempts.
-	 * If the new status if "FINISHED", this also updates the 
+	 * If the new status if "FINISHED", this also updates the
 	 * 
 	 * @param state The state update.
 	 * @return True, if the task update was properly applied, false, if the execution attempt was not found.
@@ -1184,7 +1211,7 @@ public class ExecutionGraph implements Serializable {
 			this.executionListenerActors.add(listener);
 		}
 	}
-	
+
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
 		if (jobStatusListenerActors.size() > 0) {
 			ExecutionGraphMessages.JobStatusChanged message =
@@ -1196,7 +1223,7 @@ public class ExecutionGraph implements Serializable {
 			}
 		}
 	}
-	
+
 	void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
 							newExecutionState, Throwable error)
 	{

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 165dce4..e522c8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -73,7 +73,6 @@ public class ExecutionVertex implements Serializable {
 
 	private static final long serialVersionUID = 42L;
 
-	@SuppressWarnings("unused")
 	private static final Logger LOG = ExecutionGraph.LOG;
 
 	private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
@@ -467,6 +466,10 @@ public class ExecutionVertex implements Serializable {
 		this.currentExecution.cancel();
 	}
 
+	public void stop() {
+		this.currentExecution.stop();
+	}
+
 	public void fail(Throwable t) {
 		this.currentExecution.fail(t);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index e20f737..f99d754 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -81,6 +81,7 @@ public class JobGraph implements Serializable {
 
 	/** Configuration which defines which restart strategy to use for the job recovery */
 	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
+	
 
 	/** The number of seconds after which the corresponding ExecutionGraph is removed at the
 	 * job manager after it has been executed. */

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 8ebc30c..9018029 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 
@@ -38,8 +39,8 @@ public class JobVertex implements java.io.Serializable {
 	private static final long serialVersionUID = 1L;
 
 	private static final String DEFAULT_NAME = "(unnamed vertex)";
-	
-	
+
+
 	// --------------------------------------------------------------------------------------------
 	// Members that define the structure / topology of the graph
 	// --------------------------------------------------------------------------------------------
@@ -62,15 +63,18 @@ public class JobVertex implements java.io.Serializable {
 	/** The class of the invokable. */
 	private String invokableClassName;
 
+	/** Indicates of this job vertex is stoppable or not. */
+	private boolean isStoppable = false;
+
 	/** Optionally, a source of input splits */
 	private InputSplitSource<?> inputSplitSource;
-	
+
 	/** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */
 	private String name;
-	
+
 	/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
 	private SlotSharingGroup slotSharingGroup;
-	
+
 	/** The group inside which the vertex subtasks share slots */
 	private CoLocationGroup coLocationGroup;
 
@@ -83,11 +87,11 @@ public class JobVertex implements java.io.Serializable {
 
 	/** Optional, pretty name of the operator, to be displayed in the JSON plan */
 	private String operatorPrettyName;
-	
+
 	/** Optional, the JSON for the optimizer properties of the operator result,
 	 * to be included in the JSON plan */
 	private String resultOptimizerProperties;
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -98,7 +102,7 @@ public class JobVertex implements java.io.Serializable {
 	public JobVertex(String name) {
 		this(name, null);
 	}
-	
+
 	/**
 	 * Constructs a new job vertex and assigns it with the given name.
 	 * 
@@ -109,9 +113,9 @@ public class JobVertex implements java.io.Serializable {
 		this.name = name == null ? DEFAULT_NAME : name;
 		this.id = id == null ? new JobVertexID() : id;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns the ID of this job vertex.
 	 * 
@@ -120,7 +124,7 @@ public class JobVertex implements java.io.Serializable {
 	public JobVertexID getID() {
 		return this.id;
 	}
-	
+
 	/**
 	 * Returns the name of the vertex.
 	 * 
@@ -129,7 +133,7 @@ public class JobVertex implements java.io.Serializable {
 	public String getName() {
 		return this.name;
 	}
-	
+
 	/**
 	 * Sets the name of the vertex
 	 * 
@@ -168,12 +172,13 @@ public class JobVertex implements java.io.Serializable {
 		}
 		return this.configuration;
 	}
-	
+
 	public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
 		Preconditions.checkNotNull(invokable);
 		this.invokableClassName = invokable.getName();
+		this.isStoppable = StoppableTask.class.isAssignableFrom(invokable);
 	}
-	
+
 	/**
 	 * Returns the name of the invokable class which represents the task of this vertex.
 	 * 
@@ -182,7 +187,7 @@ public class JobVertex implements java.io.Serializable {
 	public String getInvokableClassName() {
 		return this.invokableClassName;
 	}
-	
+
 	/**
 	 * Returns the invokable class which represents the task of this vertex
 	 * 
@@ -196,7 +201,7 @@ public class JobVertex implements java.io.Serializable {
 		if (invokableClassName == null) {
 			return null;
 		}
-		
+
 		try {
 			return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
 		}
@@ -207,7 +212,7 @@ public class JobVertex implements java.io.Serializable {
 			throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
 		}
 	}
-	
+
 	/**
 	 * Gets the parallelism of the task.
 	 * 
@@ -228,7 +233,7 @@ public class JobVertex implements java.io.Serializable {
 		}
 		this.parallelism = parallelism;
 	}
-	
+
 	public InputSplitSource<?> getInputSplitSource() {
 		return inputSplitSource;
 	}
@@ -236,15 +241,15 @@ public class JobVertex implements java.io.Serializable {
 	public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
 		this.inputSplitSource = inputSplitSource;
 	}
-	
+
 	public List<IntermediateDataSet> getProducedDataSets() {
 		return this.results;
 	}
-	
+
 	public List<JobEdge> getInputs() {
 		return this.inputs;
 	}
-	
+
 	/**
 	 * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
 	 * slot sharing group can run one subtask each in the same slot.
@@ -255,13 +260,13 @@ public class JobVertex implements java.io.Serializable {
 		if (this.slotSharingGroup != null) {
 			this.slotSharingGroup.removeVertexFromGroup(id);
 		}
-		
+
 		this.slotSharingGroup = grp;
 		if (grp != null) {
 			grp.addVertexToGroup(id);
 		}
 	}
-	
+
 	/**
 	 * Gets the slot sharing group that this vertex is associated with. Different vertices in the same
 	 * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
@@ -272,7 +277,7 @@ public class JobVertex implements java.io.Serializable {
 	public SlotSharingGroup getSlotSharingGroup() {
 		return slotSharingGroup;
 	}
-	
+
 	/**
 	 * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
 	 * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
@@ -294,10 +299,10 @@ public class JobVertex implements java.io.Serializable {
 		if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
 			throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
 		}
-		
+
 		CoLocationGroup thisGroup = this.coLocationGroup;
 		CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
-		
+
 		if (otherGroup == null) {
 			if (thisGroup == null) {
 				CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
@@ -320,11 +325,11 @@ public class JobVertex implements java.io.Serializable {
 			}
 		}
 	}
-	
+
 	public CoLocationGroup getCoLocationGroup() {
 		return coLocationGroup;
 	}
-	
+
 	public void updateCoLocationGroup(CoLocationGroup group) {
 		this.coLocationGroup = group;
 	}
@@ -384,38 +389,42 @@ public class JobVertex implements java.io.Serializable {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	public boolean isInputVertex() {
 		return this.inputs.isEmpty();
 	}
-	
+
+	public boolean isStoppable() {
+		return this.isStoppable;
+	}
+
 	public boolean isOutputVertex() {
 		return this.results.isEmpty();
 	}
-	
+
 	public boolean hasNoConnectedInputs() {
 		for (JobEdge edge : inputs) {
 			if (!edge.isIdReference()) {
 				return false;
 			}
 		}
-		
+
 		return true;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * A hook that can be overwritten by sub classes to implement logic that is called by the 
+	 * A hook that can be overwritten by sub classes to implement logic that is called by the
 	 * master when the job starts.
 	 * 
 	 * @param loader The class loader for user defined code.
 	 * @throws Exception The method may throw exceptions which cause the job to fail immediately.
 	 */
 	public void initializeOnMaster(ClassLoader loader) throws Exception {}
-	
+
 	/**
-	 * A hook that can be overwritten by sub classes to implement logic that is called by the 
+	 * A hook that can be overwritten by sub classes to implement logic that is called by the
 	 * master after the job completed.
 	 * 
 	 * @param loader The class loader for user defined code.
@@ -458,7 +467,7 @@ public class JobVertex implements java.io.Serializable {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return this.name + " (" + this.invokableClassName + ')';

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
new file mode 100644
index 0000000..383a0d2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobgraph.tasks;
+
+/**
+ * Implemented by tasks that can receive STOP signal.
+ */
+public interface StoppableTask {
+	/** Called on STOP signal. */
+	public void stop();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7832720..81dc01f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -49,8 +49,10 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
+import org.apache.flink.runtime.messages.TaskMessages.FailTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
@@ -59,6 +61,7 @@ import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -220,7 +223,7 @@ public class Task implements Runnable {
 	private volatile long recoveryTs;
 
 	/**
-	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to 
+	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
 	 * be undone in the case of a failing task deployment.</p>
 	 */
 	public Task(TaskDeploymentDescriptor tdd,
@@ -308,7 +311,7 @@ public class Task implements Runnable {
 		}
 
 		invokableHasBeenCanceled = new AtomicBoolean(false);
-		
+
 		// finally, create the executing thread, but do not start it
 		executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
 	}
@@ -336,15 +339,15 @@ public class Task implements Runnable {
 	public Configuration getJobConfiguration() {
 		return jobConfiguration;
 	}
-	
+
 	public Configuration getTaskConfiguration() {
 		return this.taskConfiguration;
 	}
-	
+
 	public ResultPartitionWriter[] getAllWriters() {
 		return writers;
 	}
-	
+
 	public SingleInputGate[] getAllInputGates() {
 		return inputGates;
 	}
@@ -445,7 +448,7 @@ public class Task implements Runnable {
 
 		try {
 			// ----------------------------
-			//  Task Bootstrap - We periodically 
+			//  Task Bootstrap - We periodically
 			//  check for canceling as a shortcut
 			// ----------------------------
 
@@ -636,7 +639,7 @@ public class Task implements Runnable {
 						LOG.error("Unexpected state in Task during an exception: " + current);
 						break;
 					}
-					// else fall through the loop and 
+					// else fall through the loop and
 				}
 			}
 			catch (Throwable tt) {
@@ -655,7 +658,7 @@ public class Task implements Runnable {
 				if (dispatcher != null && !dispatcher.isShutdown()) {
 					dispatcher.shutdownNow();
 				}
-				
+
 				// free the network resources
 				network.unregisterTask(this);
 
@@ -743,10 +746,39 @@ public class Task implements Runnable {
 	}
 
 	// ----------------------------------------------------------------------------------------------------------------
-	//  Canceling / Failing the task from the outside
+	//  Stopping / Canceling / Failing the task from the outside
 	// ----------------------------------------------------------------------------------------------------------------
 
 	/**
+	 * Stops the executing task by calling {@link StoppableTask#stop()}.
+	 * <p>
+	 * This method never blocks.
+	 * </p>
+	 * 
+	 * @throws UnsupportedOperationException
+	 *             if the {@link AbstractInvokable} does not implement {@link StoppableTask}
+	 */
+	public void stopExecution() throws UnsupportedOperationException {
+		LOG.info("Attempting to stop task " + taskNameWithSubtask);
+		if(this.invokable instanceof StoppableTask) {
+			Runnable runnable = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						((StoppableTask)Task.this.invokable).stop();
+					} catch(RuntimeException e) {
+						LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e);
+						taskManager.tell(new FailTask(executionId, e));
+					}
+				}
+			};
+			executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask);
+		} else {
+			throw new UnsupportedOperationException("Stopping not supported by this task.");
+		}
+	}
+
+	/**
 	 * Cancels the task execution. If the task is already in a terminal state
 	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
 	 * Otherwise it sets the state to CANCELING, and, if the invokable code is running,
@@ -853,7 +885,7 @@ public class Task implements Runnable {
 	 * {@link org.apache.flink.runtime.jobgraph.tasks.StatefulTask}.
 	 * 
 	 * @param checkpointID The ID identifying the checkpoint.
-	 * @param checkpointTimestamp The timestamp associated with the checkpoint.   
+	 * @param checkpointTimestamp The timestamp associated with the checkpoint.
 	 */
 	public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
 		AbstractInvokable invokable = this.invokable;
@@ -861,7 +893,7 @@ public class Task implements Runnable {
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
 			if (invokable instanceof StatefulTask) {
 
-				// build a local closure 
+				// build a local closure
 				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
 				final String taskName = taskNameWithSubtask;
 
@@ -895,14 +927,14 @@ public class Task implements Runnable {
 			LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
 		}
 	}
-	
+
 	public void notifyCheckpointComplete(final long checkpointID) {
 		AbstractInvokable invokable = this.invokable;
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
 			if (invokable instanceof StatefulTask) {
 
-				// build a local closure 
+				// build a local closure
 				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
 				final String taskName = taskNameWithSubtask;
 
@@ -1069,7 +1101,7 @@ public class Task implements Runnable {
 					logger.error("Error while canceling the task", t);
 				}
 
-				// interrupt the running thread initially 
+				// interrupt the running thread initially
 				executer.interrupt();
 				try {
 					executer.join(30000);

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bd18160..6a22949 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -466,6 +466,33 @@ class JobManager(
           )
       }
 
+    case StopJob(jobID) =>
+      log.info(s"Trying to stop job with ID $jobID.")
+
+      currentJobs.get(jobID) match {
+        case Some((executionGraph, _)) =>
+          try {
+            if (!executionGraph.isStoppable()) {
+              sender ! StoppingFailure(jobID, new IllegalStateException(s"Job with ID $jobID" +
+                " is not stoppable."))
+            } else if(executionGraph.getState() != JobStatus.CREATED
+                && executionGraph.getState() != JobStatus.RUNNING
+                && executionGraph.getState() != JobStatus.RESTARTING) {
+              sender ! StoppingFailure(jobID, new IllegalStateException(s"Job with ID $jobID" +
+                "is not in state CREATED, RUNNING, or RESTARTING."))
+            } else {
+              executionGraph.stop()
+              sender ! StoppingSuccess(jobID)
+            }
+          } catch {
+            case t: Throwable =>  sender ! StoppingFailure(jobID, t)
+          }
+        case None =>
+          log.info(s"No job found with ID $jobID.")
+          sender ! StoppingFailure(jobID, new IllegalArgumentException("No job found with " +
+            s"ID $jobID."))
+      }
+
     case UpdateTaskExecutionState(taskExecutionState) =>
       if (taskExecutionState == null) {
         sender ! decorateMessage(false)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 267e231..c949b4c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -96,6 +96,14 @@ object JobManagerMessages {
   case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
 
   /**
+   * Stops a (streaming) job with the given [[jobID]] at the JobManager. The result of
+   * stopping is sent back to the sender as a [[StoppingResponse]] message.
+   *
+   * @param jobID
+   */
+  case class StopJob(jobID: JobID) extends RequiresLeaderSessionID
+
+  /**
    * Requesting next input split for the
    * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
    * of the job specified by [[jobID]]. The next input split is sent back to the sender as a
@@ -238,6 +246,23 @@ object JobManagerMessages {
    */
   case class CancellationFailure(jobID: JobID, cause: Throwable) extends CancellationResponse
 
+  sealed trait StoppingResponse {
+    def jobID: JobID
+  }
+
+  /**
+   * Denotes a successful (streaming) job stopping
+   * @param jobID
+   */
+  case class StoppingSuccess(jobID: JobID) extends StoppingResponse
+
+  /**
+   * Denotes a failed (streaming) job stopping
+   * @param jobID
+   * @param cause
+   */
+  case class StoppingFailure(jobID: JobID, cause: Throwable) extends StoppingResponse
+
   /**
    * Requests all currently running jobs from the job manager. This message triggers a
    * [[RunningJobs]] response.

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
index a80ca99..94762ee 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
@@ -59,6 +59,15 @@ object TaskMessages {
     extends TaskMessage with RequiresLeaderSessionID
 
   /**
+   * Stops the task associated with [[attemptID]]. The result is sent back to the sender as a
+   * [[TaskOperationResult]] message.
+   *
+   * @param attemptID The task's execution attempt ID.
+   */
+  case class StopTask(attemptID: ExecutionAttemptID)
+    extends TaskMessage with RequiresLeaderSessionID
+
+  /**
    * Triggers a fail of specified task from the outside (as opposed to the task throwing
    * an exception itself) with the given exception as the cause.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3b68878..12bc426 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -419,6 +419,28 @@ class TaskManager(
             log.debug(s"Cannot find task to fail for execution $executionID)")
           }
 
+        // stops a task
+        case StopTask(executionID) =>
+          val task = runningTasks.get(executionID)
+          if (task != null) {
+            try {
+              task.stopExecution()
+              sender ! decorateMessage(new TaskOperationResult(executionID, true))
+            } catch {
+              case t: Throwable =>
+                        sender ! new TaskOperationResult(executionID, false,
+                            t.getClass().getSimpleName() + ": " + t.getLocalizedMessage())
+            }
+          } else {
+            log.debug(s"Cannot find task to stop for execution ${executionID})")
+            sender ! decorateMessage(
+              new TaskOperationResult(
+               executionID,
+               false,
+               "No task with that execution ID was found.")
+            )
+          }
+ 
         // cancels a task
         case CancelTask(executionID) =>
           val task = runningTasks.get(executionID)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 2ca51db..ee372dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -102,12 +102,12 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg,
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -146,12 +146,12 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -213,12 +213,12 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -467,12 +467,12 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -523,12 +523,12 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 			fail("Attached wrong jobgraph");
@@ -584,12 +584,12 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					jobId,
-					jobName,
-					cfg,
-					AkkaUtils.getDefaultTimeout(),
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(), 
+				jobId, 
+				jobName, 
+				cfg, 
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
 			try {
 				eg.attachJobGraph(ordered);
 			}
@@ -629,12 +629,12 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					jobId,
-					jobName,
-					cfg,
-					AkkaUtils.getDefaultTimeout(),
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(), 
+				jobId, 
+				jobName,
+				cfg, 
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
 
 			try {
 				eg.attachJobGraph(ordered);
@@ -700,12 +700,13 @@ public class ExecutionGraphConstructionTest {
 			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					jobId,
-					jobName,
-					cfg,
-					AkkaUtils.getDefaultTimeout(),
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(), 
+				jobId, 
+				jobName, 
+				cfg, 
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
+			
 			eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
 			
 			// check the v1 / v2 co location hints ( assumes parallelism(v1) >= parallelism(v2) )

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 9221bda..6362732 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -80,12 +80,12 @@ public class ExecutionGraphDeploymentTest {
 			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					jobId,
-					"some job",
-					new Configuration(),
-					AkkaUtils.getDefaultTimeout(),
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(), 
+				jobId, 
+				"some job", 
+				new Configuration(), 
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
 
 			List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -283,12 +283,13 @@ public class ExecutionGraphDeploymentTest {
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.directExecutionContext(),
-				jobId,
-				"some job",
-				new Configuration(),
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.directExecutionContext(), 
+			jobId, 
+			"some job", 
+			new Configuration(),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
+		
 		eg.setQueuedSchedulingAllowed(false);
 
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
@@ -328,4 +329,4 @@ public class ExecutionGraphDeploymentTest {
 			throw new Exception();
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
new file mode 100644
index 0000000..3712861
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StoppingException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+	private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+	private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+	private ExecutionVertex[][] mockEV = new ExecutionVertex[mockEJV.length][];
+	private ExecutionGraph eg;
+	private Field f;
+
+	@Before
+	public void prepare() throws Exception {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+
+
+		assert (mockEJV.length == 5);
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		JobVertex v4 = new JobVertex("vertex4");
+		JobVertex v5 = new JobVertex("vertex5");
+
+		for(int i = 0; i < mockEJV.length; ++i) {
+			mockEJV[i] = mock(ExecutionJobVertex.class);
+
+			this.mockEV[i] = new ExecutionVertex[dop[i]];
+			for (int j = 0; j < dop[i]; ++j) {
+				this.mockEV[i][j] = mock(ExecutionVertex.class);
+			}
+
+			when(mockEJV[i].getProducedDataSets()).thenReturn(new IntermediateResult[0]);
+			when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+		}
+
+		PowerMockito
+			.whenNew(ExecutionJobVertex.class)
+			.withArguments(any(ExecutionGraph.class), same(v1), any(Integer.class).intValue(),
+				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[0]);
+		PowerMockito
+			.whenNew(ExecutionJobVertex.class)
+			.withArguments(any(ExecutionGraph.class), same(v2), any(Integer.class).intValue(),
+				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[1]);
+		PowerMockito
+			.whenNew(ExecutionJobVertex.class)
+			.withArguments(any(ExecutionGraph.class), same(v3), any(Integer.class).intValue(),
+				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[2]);
+		PowerMockito
+			.whenNew(ExecutionJobVertex.class)
+			.withArguments(any(ExecutionGraph.class), same(v4), any(Integer.class).intValue(),
+				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[3]);
+		PowerMockito
+			.whenNew(ExecutionJobVertex.class)
+			.withArguments(any(ExecutionGraph.class), same(v5), any(Integer.class).intValue(),
+				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[4]);
+
+		v1.setParallelism(dop[0]);
+		v2.setParallelism(dop[1]);
+		v3.setParallelism(dop[2]);
+		v4.setParallelism(dop[3]);
+		v5.setParallelism(dop[4]);
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+		mockNumberOfInputs(1,0);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
+		mockNumberOfInputs(3,1);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+		mockNumberOfInputs(3,2);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+		mockNumberOfInputs(4,3);
+		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+		mockNumberOfInputs(4,2);
+
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+		eg = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobId, jobName,
+				cfg, AkkaUtils.getDefaultTimeout());
+		eg.attachJobGraph(ordered);
+
+		f = eg.getClass().getDeclaredField("state");
+		f.setAccessible(true);
+	}
+
+	private void mockNumberOfInputs(int nodeIndex, int predecessorIndex) {
+		for(int j = 0; j < dop[nodeIndex]; ++j) {
+			when(mockEV[nodeIndex][j].getNumberOfInputs()).thenReturn(dop[predecessorIndex]);
+		}
+	}
+
+	@Test
+	public void testCancel() throws Exception {
+		Assert.assertEquals(JobStatus.CREATED, eg.getState());
+		eg.cancel();
+
+		verifyCancel(1);
+
+		f.set(eg, JobStatus.RUNNING);
+		eg.cancel();
+
+		verifyCancel(2);
+		Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+
+		eg.cancel();
+
+		verifyCancel(2);
+		Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+
+		f.set(eg, JobStatus.CANCELED);
+		eg.cancel();
+
+		verifyCancel(2);
+		Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+
+		f.set(eg, JobStatus.FAILED);
+		eg.cancel();
+
+		verifyCancel(2);
+		Assert.assertEquals(JobStatus.FAILED, eg.getState());
+
+		f.set(eg, JobStatus.FAILING);
+		eg.cancel();
+
+		verifyCancel(2);
+		Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+
+		f.set(eg, JobStatus.FINISHED);
+		eg.cancel();
+
+		verifyCancel(2);
+		Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+
+		f.set(eg, JobStatus.RESTARTING);
+		eg.cancel();
+
+		verifyCancel(2);
+		Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+	}
+
+	private void verifyCancel(int times) {
+		for (int i = 0; i < mockEJV.length; ++i) {
+			verify(mockEJV[i], times(times)).cancel();
+		}
+
+	}
+
+	// test that all source tasks receive STOP signal
+	// test that all non-source tasks do not receive STOP signal
+	@Test
+	public void testStop() throws Exception {
+		Field f = eg.getClass().getDeclaredField("isStoppable");
+		f.setAccessible(true);
+		f.set(eg, true);
+
+		eg.stop();
+
+		for (int i : new int[]{0,2}) {
+			for (int j = 0; j < mockEV[i].length; ++j) {
+				verify(mockEV[i][j], times(1)).stop();
+			}
+		}
+
+		for (int i : new int[]{1,3,4}) {
+			for (int j = 0; j < mockEV[i].length; ++j) {
+				verify(mockEV[i][j], times(0)).stop();
+			}
+		}
+	}
+
+	// STOP only supported if all sources are stoppable 
+	@Test(expected = StoppingException.class)
+	public void testStopBatching() throws StoppingException {
+		eg.stop();
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 35ab8ac..ca07fbf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -170,12 +170,12 @@ public class ExecutionGraphTestUtils {
 		ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
 		ExecutionGraph graph = new ExecutionGraph(
-				executionContext,
-				new JobID(),
-				"test job",
-				new Configuration(),
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			executionContext, 
+			new JobID(), 
+			"test job", 
+			new Configuration(), 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 
 		ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1,
 				AkkaUtils.getDefaultTimeout()));

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 9c520b6..5dd5ba6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -1,87 +1,86 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Arrays;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
-	@Test
-	public void testAccumulatedStateFinished() {
-		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-
-			JobVertex ajv = new JobVertex("TestVertex", vid);
-			ajv.setParallelism(3);
-			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
-			ExecutionGraph graph = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					jid,
-					"test job",
-					new Configuration(),
-					AkkaUtils.getDefaultTimeout(),
-					new NoRestartStrategy());
-
-			graph.attachJobGraph(Arrays.asList(ajv));
-
-			setGraphStatus(graph, JobStatus.RUNNING);
-
-			ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
-			// mock resources and mock taskmanager
-			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				SimpleSlot slot = getInstance(
-						new SimpleActorGateway(
-								TestingUtils.defaultExecutionContext())
-				).allocateSimpleSlot(jid);
-				ee.deployToSlot(slot);
-			}
-
-			// finish all
-			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				ee.executionFinished();
-			}
-
-			assertTrue(ejv.isInFinalState());
-			assertEquals(JobStatus.FINISHED, graph.getState());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+
+public class ExecutionStateProgressTest {
+
+	@Test
+	public void testAccumulatedStateFinished() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+
+			JobVertex ajv = new JobVertex("TestVertex", vid);
+			ajv.setParallelism(3);
+			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
+
+			ExecutionGraph graph = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(), 
+				jid, 
+				"test job", 
+				new Configuration(), 
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
+			graph.attachJobGraph(Arrays.asList(ajv));
+
+			setGraphStatus(graph, JobStatus.RUNNING);
+
+			ExecutionJobVertex ejv = graph.getJobVertex(vid);
+
+			// mock resources and mock taskmanager
+			for (ExecutionVertex ee : ejv.getTaskVertices()) {
+				SimpleSlot slot = getInstance(
+						new SimpleActorGateway(
+								TestingUtils.defaultExecutionContext())
+				).allocateSimpleSlot(jid);
+				ee.deployToSlot(slot);
+			}
+
+			// finish all
+			for (ExecutionVertex ee : ejv.getTaskVertices()) {
+				ee.executionFinished();
+			}
+
+			assertTrue(ejv.isInFinalState());
+			assertEquals(JobStatus.FINISHED, graph.getState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
new file mode 100644
index 0000000..ab29e5a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest extends TestLogger {
+
+	private static ActorSystem system;
+
+	private static boolean receivedStopSignal;
+
+	@AfterClass
+	public static void teardown(){
+		if(system != null) {
+			JavaTestKit.shutdownActorSystem(system);
+			system = null;
+		}
+	}
+
+	@Test
+	public void testStop() throws Exception {
+		final JobVertexID jid = new JobVertexID();
+		final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+		Execution executionMock = mock(Execution.class);
+		whenNew(Execution.class).withAnyArguments().thenReturn(executionMock);
+
+		final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+				AkkaUtils.getDefaultTimeout());
+
+		vertex.stop();
+
+		verify(executionMock).stop();
+	}
+
+	@Test
+	public void testStopRpc() throws Exception {
+		final JobVertexID jid = new JobVertexID();
+		final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+		final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+				AkkaUtils.getDefaultTimeout());
+		final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+
+		setVertexState(vertex, ExecutionState.SCHEDULED);
+		assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
+
+		final ActorGateway gateway = new StopSequenceInstanceGateway(
+				TestingUtils.defaultExecutionContext(), new TaskOperationResult(execId, true));
+
+		Instance instance = getInstance(gateway);
+		SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+
+		vertex.deployToSlot(slot);
+
+		receivedStopSignal = false;
+		vertex.stop();
+		assertTrue(receivedStopSignal);
+	}
+
+	public static class StopSequenceInstanceGateway extends BaseTestingActorGateway {
+		private static final long serialVersionUID = 7611571264006653627L;
+
+		private final TaskOperationResult result;
+
+		public StopSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult result) {
+			super(executionContext);
+			this.result = result;
+		}
+
+		@Override
+		public Object handleMessage(Object message) throws Exception {
+			Object result = null;
+			if (message instanceof TaskMessages.SubmitTask) {
+				result = Messages.getAcknowledge();
+			} else if (message instanceof TaskMessages.StopTask) {
+				result = this.result;
+				receivedStopSignal = true;
+			}
+
+			return result;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index e0da2c9..5f9717f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -267,12 +267,12 @@ public class LocalInputSplitsTest {
 			JobGraph jobGraph = new JobGraph("test job", vertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					jobGraph.getJobID(),
-					jobGraph.getName(),
-					jobGraph.getJobConfiguration(),
-					TIMEOUT,
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(), 
+				jobGraph.getJobID(),
+				jobGraph.getName(),  
+				jobGraph.getJobConfiguration()
+				TIMEOUT,
+				new NoRestartStrategy());
 			
 			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 			eg.setQueuedSchedulingAllowed(false);
@@ -331,12 +331,13 @@ public class LocalInputSplitsTest {
 		JobGraph jobGraph = new JobGraph("test job", vertex);
 		
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobGraph.getJobID(),
-				jobGraph.getName(),
-				jobGraph.getJobConfiguration(),
-				TIMEOUT,
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(),
+			jobGraph.getJobID(),
+			jobGraph.getName(),  
+			jobGraph.getJobConfiguration()
+			TIMEOUT,
+			new NoRestartStrategy());
+		
 		eg.setQueuedSchedulingAllowed(false);
 		
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index eda9115..c1afe04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -59,12 +59,12 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg,
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -100,12 +100,12 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -142,12 +142,12 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -185,12 +185,12 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName,
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -226,12 +226,12 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -287,12 +287,12 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -339,12 +339,12 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				jobId,
-				jobName,
-				cfg,
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+			TestingUtils.defaultExecutionContext(), 
+			jobId, 
+			jobName, 
+			cfg, 
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
 		try {
 			eg.attachJobGraph(ordered);
 		}


Mime
View raw message