flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-1818] add cancel method to Client
Date Wed, 22 Jul 2015 18:17:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master aa6a7f04c -> 0c538915b


[FLINK-1818] add cancel method to Client

This closes #642.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c538915
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c538915
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c538915

Branch: refs/heads/master
Commit: 0c538915b23d9ad4e410036c9ac18c3e928ce251
Parents: aa6a7f0
Author: rainiraj <rainiraj@gmail.com>
Authored: Thu Apr 30 08:28:29 2015 -0700
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Jul 22 20:16:59 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java | 43 ++++++++++++++++++++
 1 file changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c538915/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index c544e8d..2908f92 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -27,6 +27,8 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.List;
 
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -52,9 +54,12 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -421,6 +426,44 @@ public class Client {
 		}
 	}
 
+	/**
+	 * Cancels a job identified by the job id.
+	 * @param jobId the job id
+	 * @throws Exception In case an error occurred.
+	 */
+	public void cancel(JobID jobId) throws Exception {
+		final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+
+		ActorSystem actorSystem;
+		try {
+			actorSystem = JobClient.startJobClientActorSystem(configuration);
+		} catch (Exception e) {
+			throw new ProgramInvocationException("Could start client actor system.", e);
+		}
+
+		ActorRef jobManager;
+		try {
+			jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, timeout);
+		} catch (Exception e) {
+			LOG.error("Error in getting the remote reference for the job manager", e);
+			throw new ProgramInvocationException("Failed to resolve JobManager", e);
+		}
+
+		Future<Object> response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId),
new Timeout(timeout));
+		Object result = Await.result(response, timeout);
+
+		if (result instanceof JobManagerMessages.CancellationSuccess) {
+			LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
+		} else if (result instanceof JobManagerMessages.CancellationFailure) {
+			Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
+			LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
+			throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
+		} else {
+			throw new Exception("Unknown message received while cancelling.");
+		}
+	}
+
+
 	// --------------------------------------------------------------------------------------------
 	
 	public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {


Mime
View raw message