flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [4/6] flink git commit: [FLINK-8700] Add ClusterClient.getJobStatus()
Date Sun, 11 Mar 2018 15:34:21 GMT
[FLINK-8700] Add ClusterClient.getJobStatus()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e253b5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e253b5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e253b5d

Branch: refs/heads/release-1.5
Commit: 9e253b5d1ecaf0066d7aeae2828eee4f4671b2e8
Parents: dbb8d2f
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Feb 26 11:53:47 2018 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sun Mar 11 08:31:56 2018 -0700

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 29 ++++++++++++++++++++
 .../flink/client/program/MiniClusterClient.java |  1 +
 .../client/program/rest/RestClusterClient.java  | 14 ++++++++++
 3 files changed, 44 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index a4880db..1a783fc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -584,6 +585,34 @@ public abstract class ClusterClient<T> {
 	}
 
 	/**
+	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
+	 */
+	public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+		final ActorGateway jobManager;
+		try {
+			jobManager = getJobManagerGateway();
+		} catch (FlinkException e) {
+			throw new RuntimeException("Could not retrieve JobManage gateway.", e);
+		}
+
+		Future<Object> response = jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId),
timeout);
+
+		CompletableFuture<Object> javaFuture = FutureUtils.toJava(response);
+
+		return javaFuture.thenApply((responseMessage) -> {
+			if (responseMessage instanceof JobManagerMessages.CurrentJobStatus) {
+				return ((JobManagerMessages.CurrentJobStatus) responseMessage).status();
+			} else if (responseMessage instanceof JobManagerMessages.JobNotFound) {
+				throw new CompletionException(
+					new IllegalStateException("Could not find job with JobId " + jobId));
+			} else {
+				throw new CompletionException(
+					new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
+			}
+		});
+	}
+
+	/**
 	 * Cancels a job identified by the job id.
 	 * @param jobId the job id
 	 * @throws Exception In case an error occurred.

http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index bbb5d49..7475071 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -137,6 +137,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 		throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation.");
 	}
 
+	@Override
 	public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
 		return guardWithSingleRetry(() -> miniCluster.getJobStatus(jobId), scheduledExecutor);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 18ff099..8cf0d2c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -61,6 +62,8 @@ import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
@@ -254,6 +257,17 @@ public class RestClusterClient<T> extends ClusterClient<T>
{
 		}
 	}
 
+	@Override
+	public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+		JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
+		final JobMessageParameters  params = new JobMessageParameters();
+		params.jobPathParameter.resolve(jobId);
+
+		CompletableFuture<JobDetailsInfo> responseFuture = sendRequest(detailsHeaders, params);
+
+		return responseFuture.thenApply(JobDetailsInfo::getJobStatus);
+	}
+
 	/**
 	 * Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple
 	 * times to poll the {@link JobResult} before giving up.


Mime
View raw message