flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [02/12] flink git commit: [FLINK-8811] [flip6] Implement MiniClusterClient#getJobStatus
Date Fri, 02 Mar 2018 07:54:34 GMT
[FLINK-8811] [flip6] Implement MiniClusterClient#getJobStatus


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19e4f68b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19e4f68b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19e4f68b

Branch: refs/heads/release-1.5
Commit: 19e4f68ba9cfbf5d0f54b325db4c5d196d262d09
Parents: f30ca21
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Feb 28 17:49:29 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Mar 2 08:53:48 2018 +0100

----------------------------------------------------------------------
 .../flink/client/program/MiniClusterClient.java   |  7 ++++++-
 .../flink/runtime/minicluster/MiniCluster.java    | 18 ++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19e4f68b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 5baae5b..e99addd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -57,7 +58,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 	}
 
 	@Override
-	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws
ProgramInvocationException {
+	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws
ProgramInvocationException {
 		if (isDetached()) {
 			try {
 				miniCluster.runDetached(jobGraph);
@@ -119,6 +120,10 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 		throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation.");
 	}
 
+	public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+		return miniCluster.getJobStatus(jobId);
+	}
+
 	@Override
 	public MiniClusterClient.MiniClusterId getClusterId() {
 		return MiniClusterId.INSTANCE;

http://git-wip-us.apache.org/repos/asf/flink/blob/19e4f68b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index cbfb266..5b086ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -63,6 +65,7 @@ import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetrie
 import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 
 import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
@@ -455,6 +458,21 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync
{
 	}
 
 	// ------------------------------------------------------------------------
+	//  Accessing jobs
+	// ------------------------------------------------------------------------
+
+	public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+		try {
+			return getDispatcherGateway().requestJobStatus(jobId, rpcTimeout);
+		} catch (LeaderRetrievalException | InterruptedException e) {
+			return FutureUtils.completedExceptionally(
+				new FlinkException(
+					String.format("Could not retrieve job status for job %s", jobId),
+					e));
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	//  running jobs
 	// ------------------------------------------------------------------------
 


Mime
View raw message