flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/6] flink git commit: [FLINK-8935][tests] Implement MiniClusterClient#stop
Date Tue, 20 Mar 2018 09:14:50 GMT
Repository: flink
Updated Branches:
  refs/heads/master 463b922ab -> 2dab4374b


[FLINK-8935][tests] Implement MiniClusterClient#stop

This closes #5690.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dab4374
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dab4374
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dab4374

Branch: refs/heads/master
Commit: 2dab4374bc5280a2b4536f7ad1e153d6361a8885
Parents: ca514e1
Author: zentol <chesnay@apache.org>
Authored: Wed Mar 7 13:02:27 2018 +0100
Committer: zentol <chesnay@apache.org>
Committed: Tue Mar 20 10:14:26 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/program/MiniClusterClient.java   |  2 +-
 .../apache/flink/runtime/minicluster/MiniCluster.java    | 11 +++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2dab4374/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 4354267..276df62 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -113,7 +113,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 
 	@Override
 	public void stop(JobID jobId) throws Exception {
-		throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation.");
+		guardWithSingleRetry(() -> miniCluster.stopJob(jobId), scheduledExecutor).get();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2dab4374/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index e958005..bc75a54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -512,6 +512,17 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync
{
 		}
 	}
 
+	public CompletableFuture<Acknowledge> stopJob(JobID jobId) {
+		try {
+			return getDispatcherGateway().stopJob(jobId, rpcTimeout);
+		} catch (LeaderRetrievalException | InterruptedException e) {
+			return FutureUtils.completedExceptionally(
+				new FlinkException(
+					String.format("Could not stop job %s.", jobId),
+					e));
+		}
+	}
+
 	public CompletableFuture<String> triggerSavepoint(JobID jobId, String targetDirectory,
boolean cancelJob) {
 		try {
 			return getDispatcherGateway().triggerSavepoint(jobId, targetDirectory, cancelJob, rpcTimeout);


Mime
View raw message