flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [4/9] flink git commit: [FLINK-2111] Add "stop" signal to cleanly shutdown streaming jobs
Date Mon, 15 Feb 2016 15:26:02 GMT
[FLINK-2111] Add "stop" signal to cleanly shutdown streaming jobs

- added JobType to JobGraph and ExecutionGraph
- added interface Stoppable, applied to SourceStreamTask
- added STOP signal logic to JobManager, TaskManager, ExecutionGraph
- extended Client to support stop
- extended Cli frontend, JobManager frontend
- updated documenation

Fix JobManagerTest.testStopSignal and testStopSignalFail

The StoppableInvokable could not be instantiated by Task because it was declared as a private
class. Adds additional checks to verify that the stop signal behaves correctly.

Auto-detect if job is stoppable

A job is stoppable iff all sources are stoppable

- Replace JobType by stoppable flag
- Add StoppableFunction and StoppableInvokable to support the optional stop operation
- added REST get/delete test (no extra YARN test -- think not required as get/delete is both tested)
- bug fix: job got canceld instead of stopped in web interface
- Add StoppingException
- Allow to stop jobs when they are not in state RUNNING

Second round of Till's comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd4024e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd4024e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd4024e

Branch: refs/heads/master
Commit: bdd4024e20fdfb0accb6121a68780ce3a0c218c0
Parents: 5eae47f
Author: mjsax <mjsax@informatik.hu-berlin.de>
Authored: Sat Sep 26 13:14:43 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Feb 15 16:16:13 2016 +0100

----------------------------------------------------------------------
 docs/apis/cli.md                                |   14 +
 .../org/apache/flink/client/CliFrontend.java    |   86 +-
 .../flink/client/cli/CliFrontendParser.java     |   41 +-
 .../apache/flink/client/cli/StopOptions.java    |   37 +
 .../org/apache/flink/client/program/Client.java |   59 +-
 .../client/program/ProgramStopException.java    |   53 +
 .../flink/client/CliFrontendStopTest.java       |  159 ++
 .../org/apache/flink/storm/api/FlinkClient.java |    6 +-
 .../flink/storm/wrappers/SpoutWrapper.java      |   13 +-
 .../api/common/functions/StoppableFunction.java |   33 +
 flink-runtime-web/pom.xml                       |   14 +
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   11 +-
 .../handlers/JobCancellationHandler.java        |    3 +
 .../webmonitor/handlers/JobDetailsHandler.java  |    1 +
 .../webmonitor/handlers/JobStoppingHandler.java |   49 +
 .../webmonitor/testutils/HttpTestClient.java    |   19 +
 .../web-dashboard/app/partials/jobs/job.jade    |    4 +
 .../app/scripts/modules/jobs/jobs.ctrl.coffee   |    5 +
 .../app/scripts/modules/jobs/jobs.svc.coffee    |    5 +
 .../web-dashboard/web/css/vendor.css            |    1 +
 .../web-dashboard/web/js/vendor.js              | 2053 +++++++++++-------
 .../web-dashboard/web/partials/jobs/job.html    |    1 +
 .../apache/flink/runtime/StoppingException.java |   35 +
 .../flink/runtime/executiongraph/Execution.java |   93 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   93 +-
 .../runtime/executiongraph/ExecutionVertex.java |    5 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |    1 +
 .../flink/runtime/jobgraph/JobVertex.java       |   85 +-
 .../runtime/jobgraph/tasks/StoppableTask.java   |   25 +
 .../apache/flink/runtime/taskmanager/Task.java  |   60 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   27 +
 .../runtime/messages/JobManagerMessages.scala   |   25 +
 .../runtime/messages/TaskControlMessages.scala  |    9 +
 .../flink/runtime/taskmanager/TaskManager.scala |   22 +
 .../ExecutionGraphConstructionTest.java         |   97 +-
 .../ExecutionGraphDeploymentTest.java           |   27 +-
 .../ExecutionGraphSignalsTest.java              |  224 ++
 .../executiongraph/ExecutionGraphTestUtils.java |   12 +-
 .../ExecutionStateProgressTest.java             |  173 +-
 .../executiongraph/ExecutionVertexStopTest.java |  132 ++
 .../executiongraph/LocalInputSplitsTest.java    |   25 +-
 .../executiongraph/PointwisePatternTest.java    |   84 +-
 .../runtime/jobmanager/JobManagerTest.java      |  107 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  131 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |  114 +
 .../runtime/testutils/StoppableInvokable.java   |   32 +
 .../environment/StreamExecutionEnvironment.java |    9 +-
 .../flink/streaming/api/graph/StreamGraph.java  |    7 +-
 .../api/operators/StoppableStreamSource.java    |   51 +
 .../tasks/StoppableSourceStreamTask.java        |   33 +
 .../runtime/tasks/SourceStreamTaskTest.java     |   53 +-
 flink-tests/pom.xml                             |   16 +
 .../flink/test/web/WebFrontendITCase.java       |   89 +-
 53 files changed, 3389 insertions(+), 1174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index c9145bc..421ed94 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -108,6 +108,10 @@ The command line can be used to
 
         ./bin/flink cancel <jobID>
 
+-   Stop a job (streaming jobs only):
+
+        ./bin/flink stop <jobID>
+
 ### Savepoints
 
 [Savepoints]({{site.baseurl}}/apis/streaming/savepoints.html) are controlled via the command line client:
@@ -248,6 +252,16 @@ Action "cancel" cancels a running program.
                                    configuration.
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] <Job ID>
+  "stop" action options:
+     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
+                                   to connect. Use this flag to connect to a
+                                   different JobManager than the one specified
+                                   in the configuration.
+
+
 Action "savepoint" triggers savepoints for a running job or disposes existing ones.
 
   Syntax: savepoint [OPTIONS] <Job ID>

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index da91bca..98bf056 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -36,6 +36,7 @@ import org.apache.flink.client.cli.ListOptions;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.cli.SavepointOptions;
+import org.apache.flink.client.cli.StopOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -56,7 +57,10 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -110,6 +114,7 @@ public class CliFrontend {
 	public static final String ACTION_INFO = "info";
 	private static final String ACTION_LIST = "list";
 	private static final String ACTION_CANCEL = "cancel";
+	private static final String ACTION_STOP = "stop";
 	private static final String ACTION_SAVEPOINT = "savepoint";
 
 	// config dir parameters
@@ -290,9 +295,6 @@ public class CliFrontend {
 		catch (FileNotFoundException e) {
 			return handleArgException(e);
 		}
-		catch (ProgramInvocationException e) {
-			return handleError(e);
-		}
 		catch (Throwable t) {
 			return handleError(t);
 		}
@@ -362,7 +364,7 @@ public class CliFrontend {
 	/**
 	 * Executes the info action.
 	 * 
-	 * @param args Command line arguments for the info action. 
+	 * @param args Command line arguments for the info action.
 	 */
 	protected int info(String[] args) {
 		LOG.info("Running 'info' command.");
@@ -568,6 +570,65 @@ public class CliFrontend {
 	}
 
 	/**
+	 * Executes the STOP action.
+	 * 
+	 * @param args Command line arguments for the stop action.
+	 */
+	protected int stop(String[] args) {
+		LOG.info("Running 'stop' command.");
+
+		StopOptions options;
+		try {
+			options = CliFrontendParser.parseStopCommand(args);
+		}
+		catch (CliArgsException e) {
+			return handleArgException(e);
+		}
+		catch (Throwable t) {
+			return handleError(t);
+		}
+
+		// evaluate help flag
+		if (options.isPrintHelp()) {
+			CliFrontendParser.printHelpForStop();
+			return 0;
+		}
+
+		String[] stopArgs = options.getArgs();
+		JobID jobId;
+
+		if (stopArgs.length > 0) {
+			String jobIdString = stopArgs[0];
+			try {
+				jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
+			}
+			catch (Exception e) {
+				return handleError(e);
+			}
+		}
+		else {
+			return handleArgException(new CliArgsException("Missing JobID"));
+		}
+
+		try {
+			ActorGateway jobManager = getJobManagerGateway(options);
+			Future<Object> response = jobManager.ask(new StopJob(jobId), clientTimeout);
+
+			final Object rc = Await.result(response, clientTimeout);
+
+			if (rc instanceof StoppingFailure) {
+				throw new Exception("Stopping the job with ID " + jobId + " failed.",
+						((StoppingFailure) rc).cause());
+			}
+
+			return 0;
+		}
+		catch (Throwable t) {
+			return handleError(t);
+		}
+	}
+
+	/**
 	 * Executes the CANCEL action.
 	 * 
 	 * @param args Command line arguments for the cancel action.
@@ -616,13 +677,14 @@ public class CliFrontend {
 			ActorGateway jobManager = getJobManagerGateway(options);
 			Future<Object> response = jobManager.ask(new CancelJob(jobId), clientTimeout);
 
-			try {
-				Await.result(response, clientTimeout);
-				return 0;
-			}
-			catch (Exception e) {
-				throw new Exception("Canceling the job with ID " + jobId + " failed.", e);
+			final Object rc = Await.result(response, clientTimeout);
+
+			if (rc instanceof CancellationFailure) {
+				throw new Exception("Canceling the job with ID " + jobId + " failed.",
+						((CancellationFailure) rc).cause());
 			}
+
+			return 0;
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -1123,6 +1185,8 @@ public class CliFrontend {
 				return info(params);
 			case ACTION_CANCEL:
 				return cancel(params);
+			case ACTION_STOP:
+				return stop(params);
 			case ACTION_SAVEPOINT:
 				return savepoint(params);
 			case "-h":
@@ -1139,7 +1203,7 @@ public class CliFrontend {
 			default:
 				System.out.printf("\"%s\" is not a valid action.\n", action);
 				System.out.println();
-				System.out.println("Valid actions are \"run\", \"list\", \"info\", or \"cancel\".");
+				System.out.println("Valid actions are \"run\", \"list\", \"info\", \"stop\", or \"cancel\".");
 				System.out.println();
 				System.out.println("Specify the version option (-v or --version) to print Flink version.");
 				System.out.println();

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 07d409e..2ac53d2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -39,13 +39,13 @@ public class CliFrontendParser {
 
 
 	static final Option HELP_OPTION = new Option("h", "help", false,
-												"Show the help message for the CLI Frontend or the action.");
+			"Show the help message for the CLI Frontend or the action.");
 
 	static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
 
 	public static final Option CLASS_OPTION = new Option("c", "class", true,
 			"Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
-					"JAR file does not specify the class in its manifest.");
+			"JAR file does not specify the class in its manifest.");
 
 	static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " +
 			"classloader  on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " +
@@ -55,7 +55,7 @@ public class CliFrontendParser {
 
 	static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
 			"The parallelism with which to run the program. Optional flag to override the default value " +
-					"specified in the configuration.");
+			"specified in the configuration.");
 
 	static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
 			"supress logging output to standard out.");
@@ -67,9 +67,9 @@ public class CliFrontendParser {
 			"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
 
 	static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
-			"Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER
-					+ "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
-					"different JobManager than the one specified in the configuration.");
+			"Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER +
+			"' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
+			"different JobManager than the one specified in the configuration.");
 
 	static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
 			"Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).");
@@ -123,6 +123,7 @@ public class CliFrontendParser {
 	private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options()));
 	private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options()));
 	private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options()));
+	private static final Options STOP_OPTIONS = getStopOptions(buildGeneralOptions(new Options()));
 	private static final Options SAVEPOINT_OPTIONS = getSavepointOptions(buildGeneralOptions(new Options()));
 
 	private static Options buildGeneralOptions(Options options) {
@@ -197,6 +198,11 @@ public class CliFrontendParser {
 		return options;
 	}
 
+	private static Options getStopOptions(Options options) {
+		options = getJobManagerAddressOption(options);
+		return options;
+	}
+
 	private static Options getSavepointOptions(Options options) {
 		options = getJobManagerAddressOption(options);
 		options.addOption(SAVEPOINT_DISPOSE_OPTION);
@@ -218,6 +224,7 @@ public class CliFrontendParser {
 		printHelpForRun();
 		printHelpForInfo();
 		printHelpForList();
+		printHelpForStop();
 		printHelpForCancel();
 		printHelpForSavepoint();
 
@@ -264,6 +271,18 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
+	public static void printHelpForStop() {
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setLeftPadding(5);
+		formatter.setWidth(80);
+
+		System.out.println("\nAction \"stop\" stops a running program (streaming jobs only).");
+		System.out.println("\n  Syntax: stop [OPTIONS] <Job ID>");
+		formatter.setSyntaxPrefix("  \"stop\" action options:");
+		formatter.printHelp(" ", getStopOptions(new Options()));
+		System.out.println();
+	}
+
 	public static void printHelpForCancel() {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
@@ -325,6 +344,16 @@ public class CliFrontendParser {
 		}
 	}
 
+	public static StopOptions parseStopCommand(String[] args) throws CliArgsException {
+		try {
+			PosixParser parser = new PosixParser();
+			CommandLine line = parser.parse(STOP_OPTIONS, args, false);
+			return new StopOptions(line);
+		} catch (ParseException e) {
+			throw new CliArgsException(e.getMessage());
+		}
+	}
+
 	public static SavepointOptions parseSavepointCommand(String[] args) throws CliArgsException {
 		try {
 			PosixParser parser = new PosixParser();

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
new file mode 100644
index 0000000..7f246c8
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the STOP command
+ */
+public class StopOptions extends CommandLineOptions {
+
+	private final String[] args;
+
+	public StopOptions(CommandLine line) {
+		super(line);
+		this.args = line.getArgs();
+	}
+
+	public String[] getArgs() {
+		return args == null ? new String[0] : args;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 452710c..999b461 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -29,7 +29,6 @@ import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.CompilerException;
@@ -43,16 +42,17 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -413,25 +413,60 @@ public class Client {
 	 * @throws Exception In case an error occurred.
 	 */
 	public void cancel(JobID jobId) throws Exception {
-		ActorGateway jobManagerGateway = getJobManagerGateway();
+		final ActorGateway jobManagerGateway = getJobManagerGateway();
 
-		Future<Object> response;
+		final Future<Object> response;
 		try {
 			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
-		} catch (Exception e) {
+		} catch (final Exception e) {
 			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
 		}
 
-		Object result = Await.result(response, timeout);
+		final Object result = Await.result(response, timeout);
 
 		if (result instanceof JobManagerMessages.CancellationSuccess) {
-			LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
+			LOG.info("Job cancellation with ID " + jobId + " succeeded.");
 		} else if (result instanceof JobManagerMessages.CancellationFailure) {
-			Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
-			LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
+			final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
+			LOG.info("Job cancellation with ID " + jobId + " failed.", t);
 			throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
 		} else {
-			throw new Exception("Unknown message received while cancelling.");
+			throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
+		}
+	}
+
+	/**
+	 * Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
+	 * Stopping works only for streaming programs. Be aware, that the program might continue to run for
+	 * a while after sending the stop command, because after sources stopped to emit data all operators
+	 * need to finish processing.
+	 * 
+	 * @param jobId
+	 *            the job ID of the streaming program to stop
+	 * @throws ProgramStopException
+	 *             If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal
+	 *             failed. That might be due to an I/O problem, ie, the job-manager is unreachable.
+	 */
+	public void stop(final JobID jobId) throws Exception {
+		final ActorGateway jobManagerGateway = getJobManagerGateway();
+
+		final Future<Object> response;
+		try {
+			response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout);
+		} catch (final Exception e) {
+			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
+		}
+
+		final Object result = Await.result(response, timeout);
+
+		if (result instanceof JobManagerMessages.StoppingSuccess) {
+			LOG.info("Job stopping with ID " + jobId + " succeeded.");
+		} else if (result instanceof JobManagerMessages.StoppingFailure) {
+			final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause();
+			LOG.info("Job stopping with ID " + jobId + " failed.", t);
+			throw new Exception("Failed to stop the job because of \n" + t.getMessage());
+		} else {
+			throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
 		}
 	}
 
@@ -482,7 +517,7 @@ public class Client {
 	// ------------------------------------------------------------------------
 	//  Sessions
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Tells the JobManager to finish the session (job) defined by the given ID.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java
new file mode 100644
index 0000000..a1d8a9b
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+/**
+ * Exception used to indicate that there is an error during stopping of a Flink program.
+ */
+public class ProgramStopException extends Exception {
+	private static final long serialVersionUID = -906791331829704450L;
+
+	/**
+	 * Creates a <tt>ProgramStopException</tt> with the given message.
+	 * 
+	 * @param message
+	 *            The message for the exception.
+	 */
+	public ProgramStopException(String message) {
+		super(message);
+	}
+
+	/**
+	 * Creates a <tt>ProgramStopException</tt> for the given exception.
+	 * 
+	 * @param cause
+	 *            The exception that causes the program invocation to fail.
+	 */
+	public ProgramStopException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * Creates a <tt>ProgramStopException</tt> for the given exception with an additional message.
+	 * 
+	 * @param message
+	 *            The additional message.
+	 * @param cause
+	 *            The exception that causes the program invocation to fail.
+	 */
+	public ProgramStopException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
new file mode 100644
index 0000000..7c34c75
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.apache.flink.client.CliFrontendTestUtils.clearGlobalConfiguration;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest extends TestLogger {
+
+	private static ActorSystem actorSystem;
+
+	@BeforeClass
+	public static void setup() {
+		pipeSystemOutToNull();
+		clearGlobalConfiguration();
+		actorSystem = ActorSystem.create("TestingActorSystem");
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(actorSystem);
+		actorSystem = null;
+	}
+
+	@Test
+	public void testStop() throws Exception {
+		// test unrecognized option
+		{
+			String[] parameters = { "-v", "-l" };
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			int retCode = testFrontend.stop(parameters);
+			assertTrue(retCode != 0);
+		}
+
+		// test missing job id
+		{
+			String[] parameters = {};
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			int retCode = testFrontend.stop(parameters);
+			assertTrue(retCode != 0);
+		}
+
+		// test stop properly
+		{
+			JobID jid = new JobID();
+			String jidString = jid.toString();
+
+			final UUID leaderSessionID = UUID.randomUUID();
+			final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid, leaderSessionID));
+
+			final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
+
+			String[] parameters = { jidString };
+			StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway);
+
+			int retCode = testFrontend.stop(parameters);
+			assertTrue(retCode == 0);
+		}
+
+		// test unknown job Id
+		{
+			JobID jid1 = new JobID();
+			JobID jid2 = new JobID();
+
+			final UUID leaderSessionID = UUID.randomUUID();
+			final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid1, leaderSessionID));
+
+			final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
+
+			String[] parameters = { jid2.toString() };
+			StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway);
+
+			assertTrue(testFrontend.stop(parameters) != 0);
+		}
+	}
+
+	protected static final class StopTestCliFrontend extends CliFrontend {
+
+		private ActorGateway jobManagerGateway;
+
+		public StopTestCliFrontend(ActorGateway jobManagerGateway) throws Exception {
+			super(CliFrontendTestUtils.getConfigDir());
+			this.jobManagerGateway = jobManagerGateway;
+		}
+
+		@Override
+		public ActorGateway getJobManagerGateway(CommandLineOptions options) {
+			return jobManagerGateway;
+		}
+	}
+
+	protected static final class CliJobManager extends FlinkUntypedActor {
+		private final JobID jobID;
+		private final UUID leaderSessionID;
+
+		public CliJobManager(final JobID jobID, final UUID leaderSessionID) {
+			this.jobID = jobID;
+			this.leaderSessionID = leaderSessionID;
+		}
+
+		@Override
+		public void handleMessage(Object message) {
+			if (message instanceof JobManagerMessages.RequestTotalNumberOfSlots$) {
+				getSender().tell(decorateMessage(1), getSelf());
+			} else if (message instanceof JobManagerMessages.StopJob) {
+				JobManagerMessages.StopJob stopJob = (JobManagerMessages.StopJob) message;
+
+				if (jobID != null && jobID.equals(stopJob.jobID())) {
+					getSender().tell(decorateMessage(new Status.Success(new Object())), getSelf());
+				} else {
+					getSender()
+							.tell(decorateMessage(new Status.Failure(new Exception(
+									"Wrong or no JobID"))), getSelf());
+				}
+			} else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) {
+				getSender().tell(decorateMessage(new JobManagerMessages.RunningJobsStatus()),
+						getSelf());
+			}
+		}
+
+		@Override
+		protected UUID getLeaderSessionID() {
+			return leaderSessionID;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 767f1b1..2541345 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -208,7 +208,7 @@ public class FlinkClient {
 		final Client client;
 		try {
 			client = new Client(configuration);
-		} catch (IOException e) {
+		} catch (final IOException e) {
 			throw new RuntimeException("Could not establish a connection to the job manager", e);
 		}
 
@@ -253,7 +253,7 @@ public class FlinkClient {
 		}
 
 		try {
-			client.cancel(jobId);
+			client.stop(jobId);
 		} catch (final Exception e) {
 			throw new RuntimeException("Cannot stop job.", e);
 		}
@@ -282,7 +282,7 @@ public class FlinkClient {
 			final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
 					new Timeout(askTimeout));
 
-			Object result;
+			final Object result;
 			try {
 				result = Await.result(response, askTimeout);
 			} catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index e077aeb..66b05c6 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -25,6 +25,7 @@ import backtype.storm.topology.IRichSpout;
 import com.google.common.collect.Sets;
 
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
@@ -52,7 +53,7 @@ import java.util.HashMap;
  * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until
  * {@link FiniteSpout#reachedEnd()} returns true.
  */
-public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
+public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction {
 	private static final long serialVersionUID = -218340336648247605L;
 
 	/** Number of attributes of the spouts's output tuples per stream. */
@@ -299,6 +300,16 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
 		this.isRunning = false;
 	}
 
+	/**
+	 * {@inheritDoc}
+	 * <p>
+	 * Sets the {@link #isRunning} flag to {@code false}.
+	 */
+	@Override
+	public void stop() {
+		this.isRunning = false;
+	}
+
 	@Override
 	public void close() throws Exception {
 		this.spout.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
new file mode 100644
index 0000000..a83b73f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.api.common.functions;
+
+/**
+ * Must be implemented by stoppable functions, eg, source functions of streaming jobs. The method {@link #stop()} will
+ * be called when the job received the STOP signal. On this signal, the source function must stop emitting new data and
+ * terminate gracefully.
+ */
+public interface StoppableFunction {
+	/**
+	 * Stops the source. In contrast to {@code cancel()} this is a request to the source function to shut down
+	 * gracefully. Pending data can still be emitted and it is not required to stop immediately -- however, in the near
+	 * future. The job will keep running until all emitted data is processed completely.
+	 * <p>
+	 * Most streaming sources will have a while loop inside the {@code run()} method. You need to ensure that the source
+	 * will break out of this loop. This can be achieved by having a volatile field "isRunning" that is checked in the
+	 * loop and that is set to false in this method.
+	 * <p>
+	 * <strong>The call to {@code stop()} should not block and not throw any exception.</strong>
+	 */
+	public void stop();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 1a19fb1..27bbbd5 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -44,6 +44,20 @@ under the License.
 				</includes>
 			</resource>
 		</resources>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<inherited>true</inherited>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
 	</build>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 0b5de1f..67c0dab 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
@@ -245,8 +246,14 @@ public class WebRuntimeMonitor implements WebMonitor {
 			// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
 			.GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler()))
 
-			// DELETE is the preferred way of cancelling a job (Rest-conform)
-			.DELETE("/jobs/:jobid", handler(new JobCancellationHandler()));
+			// DELETE is the preferred way of canceling a job (Rest-conform)
+			.DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler()))
+
+			// stop a job via GET (for proper integration with YARN this has to be performed via GET)
+			.GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler()))
+
+			// DELETE is the preferred way of stopping a job (Rest-conform)
+			.DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler()));
 
 		if (webSubmitAllow) {
 			router

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index aae8b34..b17acdc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -25,6 +25,9 @@ import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
 
+/**
+ * Request handler for the CANCEL request.
+ */
 public class JobCancellationHandler implements RequestHandler {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 4511c17..4f31128 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -61,6 +61,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 		// basic info
 		gen.writeStringField("jid", graph.getJobID().toString());
 		gen.writeStringField("name", graph.getJobName());
+		gen.writeBooleanField("isStoppable", graph.isStoppable());
 		gen.writeStringField("state", graph.getState().name());
 		
 		// times and duration

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
new file mode 100644
index 0000000..791790a
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Map;
+
+/**
+ * Request handler for the STOP request.
+ */
+public class JobStoppingHandler implements RequestHandler {
+
+	@Override
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		try {
+			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+			if (jobManager != null) {
+				jobManager.tell(new JobManagerMessages.StopJob(jobid));
+				return "{}";
+			}
+			else {
+				throw new Exception("No connection to the leading JobManager.");
+			}
+		}
+		catch (Exception e) {
+			throw new Exception("Failed to stop the job with id: "  + pathParams.get("jobid") + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
index d7d4457..9a396d3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
@@ -165,6 +165,25 @@ public class HttpTestClient implements AutoCloseable {
 	}
 
 	/**
+	 * Sends a simple DELETE request to the given path. You only specify the $path part of
+	 * http://$host:$host/$path.
+	 *
+	 * @param path The $path to DELETE (http://$host:$host/$path)
+	 */
+	public void sendDeleteRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException {
+		if (!path.startsWith("/")) {
+			path = "/" + path;
+		}
+
+		HttpRequest getRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+				HttpMethod.DELETE, path);
+		getRequest.headers().set(HttpHeaders.Names.HOST, host);
+		getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+		sendRequest(getRequest, timeout);
+	}
+
+	/**
 	 * Returns the next available HTTP response. A call to this method blocks until a response
 	 * becomes available.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
index 5b541ae..fe3e0fc 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
@@ -43,6 +43,10 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
     span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
       | Cancel
 
+  .navbar-info.last.first(ng-if!="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' || job.state=='RESTARTING')")
+    span.navbar-info-button.btn.btn-default(ng-click="stopJob($event)")
+      | Stop
+
 nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
   ul.nav.nav-tabs
     li(ui-sref-active='active')

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
index f0ce892..931976d 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
@@ -80,6 +80,11 @@ angular.module('flinkApp')
     JobsService.cancelJob($stateParams.jobid).then (data) ->
       {}
 
+  $scope.stopJob = (stopEvent) ->
+    angular.element(stopEvent.currentTarget).removeClass("btn").removeClass("btn-default").html('Stopping...')
+    JobsService.stopJob($stateParams.jobid).then (data) ->
+      {}
+
   $scope.toggleHistory = ->
     $scope.showHistory = !$scope.showHistory
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
index 65ae5cb..71f0921 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
@@ -282,4 +282,9 @@ angular.module('flinkApp')
     # proper "DELETE jobs/<jobid>/"
     $http.get flinkConfig.jobServer + "jobs/" + jobid + "/yarn-cancel"
 
+  @stopJob = (jobid) ->
+    # uses the non REST-compliant GET yarn-cancel handler which is available in addition to the
+    # proper "DELETE jobs/<jobid>/"
+    $http.get "jobs/" + jobid + "/yarn-stop"
+
   @

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/web/css/vendor.css
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/css/vendor.css b/flink-runtime-web/web-dashboard/web/css/vendor.css
index 2a8d00f..e0c9259 100644
--- a/flink-runtime-web/web-dashboard/web/css/vendor.css
+++ b/flink-runtime-web/web-dashboard/web/css/vendor.css
@@ -5902,6 +5902,7 @@ button.close {
 .modal-header {
   padding: 15px;
   border-bottom: 1px solid #e5e5e5;
+  min-height: 16.42857143px;
 }
 .modal-header .close {
   margin-top: -2px;


Mime
View raw message