From commits-return-16183-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Mar 2 08:54:35 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EB48418062F for ; Fri, 2 Mar 2018 08:54:34 +0100 (CET) Received: (qmail 27666 invoked by uid 500); 2 Mar 2018 07:54:33 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 27627 invoked by uid 99); 2 Mar 2018 07:54:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Mar 2018 07:54:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BB2C9E38C5; Fri, 2 Mar 2018 07:54:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Fri, 02 Mar 2018 07:54:34 -0000 Message-Id: <43afa859872045b5951cef6b16642078@git.apache.org> In-Reply-To: <22550b6cb97e4aa487b6b55ea459da0c@git.apache.org> References: <22550b6cb97e4aa487b6b55ea459da0c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/12] flink git commit: [FLINK-8811] [flip6] Implement MiniClusterClient#getJobStatus [FLINK-8811] [flip6] Implement MiniClusterClient#getJobStatus Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19e4f68b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19e4f68b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19e4f68b Branch: refs/heads/release-1.5 Commit: 19e4f68ba9cfbf5d0f54b325db4c5d196d262d09 Parents: f30ca21 Author: Till Rohrmann Authored: Wed Feb 28 17:49:29 2018 +0100 Committer: Till Rohrmann Committed: Fri Mar 2 08:53:48 2018 +0100 ---------------------------------------------------------------------- .../flink/client/program/MiniClusterClient.java | 7 ++++++- .../flink/runtime/minicluster/MiniCluster.java | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/19e4f68b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 5baae5b..e99addd 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -57,7 +58,7 @@ public class MiniClusterClient extends ClusterClient getJobStatus(JobID jobId) { + return miniCluster.getJobStatus(jobId); + } + @Override public MiniClusterClient.MiniClusterId getClusterId() { return MiniClusterId.INSTANCE; http://git-wip-us.apache.org/repos/asf/flink/blob/19e4f68b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index cbfb266..5b086ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.minicluster; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; @@ -41,6 +42,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -63,6 +65,7 @@ import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetrie import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import akka.actor.ActorSystem; import com.typesafe.config.Config; @@ -455,6 +458,21 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } // ------------------------------------------------------------------------ + // Accessing jobs + // ------------------------------------------------------------------------ + + public CompletableFuture getJobStatus(JobID jobId) { + try { + return getDispatcherGateway().requestJobStatus(jobId, rpcTimeout); + } catch (LeaderRetrievalException | InterruptedException e) { + return FutureUtils.completedExceptionally( + new FlinkException( + String.format("Could not retrieve job status for job %s", jobId), + e)); + } + } + + // ------------------------------------------------------------------------ // running jobs // ------------------------------------------------------------------------