flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/3] flink git commit: [hotfix] Introduce NewClusterClient interface
Date Tue, 03 Apr 2018 14:36:21 GMT
[hotfix] Introduce NewClusterClient interface

The NewClusterClient interface contains asynchronous submitJob and requestJobResult
methods which will replace the ClusterClient#submitJob method.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12096207
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12096207
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12096207

Branch: refs/heads/release-1.5
Commit: 12096207614179dce0af0d44de6e1b0e27c60e02
Parents: dffbf41
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Mar 27 09:20:33 2018 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Apr 3 16:35:59 2018 +0200

----------------------------------------------------------------------
 .../flink/client/program/MiniClusterClient.java | 45 ++++++++++++---
 .../flink/client/program/NewClusterClient.java  | 50 ++++++++++++++++
 .../client/program/rest/RestClusterClient.java  | 27 +++++----
 .../flink/runtime/minicluster/MiniCluster.java  | 60 ++++++++++++--------
 4 files changed, 138 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12096207/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index a135359..faf96ec 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -21,23 +21,25 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
@@ -45,11 +47,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Client to interact with a {@link MiniCluster}.
  */
-public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId>
{
+public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId>
implements NewClusterClient {
 
 	private final MiniCluster miniCluster;
 
@@ -66,28 +69,54 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 
 	@Override
 	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws
ProgramInvocationException {
+		final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);
+
 		if (isDetached()) {
 			try {
-				miniCluster.runDetached(jobGraph);
-			} catch (JobExecutionException | InterruptedException e) {
+				return jobSubmissionResultFuture.get();
+			} catch (InterruptedException | ExecutionException e) {
+				ExceptionUtils.checkInterrupted(e);
+
 				throw new ProgramInvocationException(
 					String.format("Could not run job %s in detached mode.", jobGraph.getJobID()),
 					e);
 			}
-
-			return new JobSubmissionResult(jobGraph.getJobID());
 		} else {
+			final CompletableFuture<JobResult> jobResultFuture = jobSubmissionResultFuture.thenCompose(
+				(JobSubmissionResult ignored) -> requestJobResult(jobGraph.getJobID()));
+
+			final JobResult jobResult;
 			try {
-				return miniCluster.executeJobBlocking(jobGraph);
-			} catch (JobExecutionException | InterruptedException e) {
+				jobResult = jobResultFuture.get();
+			} catch (InterruptedException | ExecutionException e) {
+				ExceptionUtils.checkInterrupted(e);
+
 				throw new ProgramInvocationException(
 					String.format("Could not run job %s.", jobGraph.getJobID()),
 					e);
 			}
+
+			try {
+				return jobResult.toJobExecutionResult(classLoader);
+			} catch (JobResult.WrappedJobException e) {
+				throw new ProgramInvocationException(e.getCause());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new ProgramInvocationException(e);
+			}
 		}
 	}
 
 	@Override
+	public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph)
{
+		return miniCluster.submitJob(jobGraph);
+	}
+
+	@Override
+	public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
+		return miniCluster.requestJobResult(jobId);
+	}
+
+	@Override
 	public void cancel(JobID jobId) throws Exception {
 		miniCluster.cancelJob(jobId).get();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/12096207/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
new file mode 100644
index 0000000..513f7da
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/NewClusterClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for the new cluster client.
+ */
+public interface NewClusterClient {
+
+	/**
+	 * Submit the given {@link JobGraph} to the cluster.
+	 *
+	 * @param jobGraph to submit
+	 * @return Future which is completed with the {@link JobSubmissionResult}
+	 */
+	CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph);
+
+	/**
+	 * Request the {@link JobResult} for the given {@link JobID}.
+	 *
+	 * @param jobId for which to request the {@link JobResult}
+	 * @return Future which is completed with the {@link JobResult}
+	 */
+	CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12096207/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index cf68374..4a4f993 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.NewClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
@@ -101,6 +102,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 
 import akka.actor.AddressFromURIString;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -126,7 +128,7 @@ import java.util.stream.Collectors;
 /**
  * A {@link ClusterClient} implementation that communicates via HTTP REST requests.
  */
-public class RestClusterClient<T> extends ClusterClient<T> {
+public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient
{
 
 	private final RestClusterClientConfiguration restClusterClientConfiguration;
 
@@ -235,16 +237,14 @@ public class RestClusterClient<T> extends ClusterClient<T>
{
 	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws
ProgramInvocationException {
 		log.info("Submitting job {}.", jobGraph.getJobID());
 
-		final CompletableFuture<JobSubmitResponseBody> jobSubmissionFuture = submitJob(jobGraph);
+		final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
 
 		if (isDetached()) {
 			try {
-				jobSubmissionFuture.get();
+				return jobSubmissionFuture.get();
 			} catch (Exception e) {
 				throw new ProgramInvocationException("Could not submit job " + jobGraph.getJobID() +
'.', ExceptionUtils.stripExecutionException(e));
 			}
-
-			return new JobSubmissionResult(jobGraph.getJobID());
 		} else {
 			final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
 				ignored -> requestJobResult(jobGraph.getJobID()));
@@ -286,7 +286,8 @@ public class RestClusterClient<T> extends ClusterClient<T>
{
 	 * @return Future which is completed with the {@link JobResult} once the job has completed
or
 	 * with a failure if the {@link JobResult} could not be retrieved.
 	 */
-	public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
+	@Override
+	public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
 		return pollResourceAsync(
 			() -> {
 				final JobMessageParameters messageParameters = new JobMessageParameters();
@@ -305,7 +306,8 @@ public class RestClusterClient<T> extends ClusterClient<T>
{
 	 * @param jobGraph to submit
 	 * @return Future which is completed with the submission response
 	 */
-	public CompletableFuture<JobSubmitResponseBody> submitJob(JobGraph jobGraph) {
+	@Override
+	public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph)
{
 		// we have to enable queued scheduling because slot will be allocated lazily
 		jobGraph.setAllowQueuedScheduling(true);
 
@@ -346,10 +348,13 @@ public class RestClusterClient<T> extends ClusterClient<T>
{
 				}
 			});
 
-		return submissionFuture.exceptionally(
-			(Throwable throwable) -> {
-				throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed
to submit JobGraph.", throwable));
-			});
+		return submissionFuture
+			.thenApply(
+				(JobSubmitResponseBody jobSubmitResponseBody) -> new JobSubmissionResult(jobGraph.getJobID()))
+			.exceptionally(
+				(Throwable throwable) -> {
+					throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed
to submit JobGraph.", throwable));
+				});
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12096207/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 59e5ff0..2e826eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -573,18 +574,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync
{
 	public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException
{
 		checkNotNull(job, "job is null");
 
-		final DispatcherGateway currentDispatcherGateway;
-		try {
-			currentDispatcherGateway = getDispatcherGateway();
-		} catch (LeaderRetrievalException e) {
-			throw new JobExecutionException(job.getJobID(), e);
-		}
-
-		// we have to allow queued scheduling in the new mode because we need to request slots
-		// from the ResourceManager
-		job.setAllowQueuedScheduling(true);
-
-		final CompletableFuture<Acknowledge> submissionFuture = currentDispatcherGateway.submitJob(job,
rpcTimeout);
+		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
 
 		try {
 			submissionFuture.get();
@@ -607,21 +597,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync
{
 	public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException,
InterruptedException {
 		checkNotNull(job, "job is null");
 
-		final DispatcherGateway currentDispatcherGateway;
-		try {
-			currentDispatcherGateway = getDispatcherGateway();
-		} catch (LeaderRetrievalException e) {
-			throw new JobExecutionException(job.getJobID(), e);
-		}
-
-		// we have to allow queued scheduling in the new mode because we need to request slots
-		// from the ResourceManager
-		job.setAllowQueuedScheduling(true);
-
-		final CompletableFuture<Acknowledge> submissionFuture = currentDispatcherGateway.submitJob(job,
rpcTimeout);
+		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
 
 		final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
-			(Acknowledge ack) -> currentDispatcherGateway.requestJobResult(job.getJobID(), RpcUtils.INF_TIMEOUT));
+			(JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
 
 		final JobResult jobResult;
 
@@ -640,6 +619,37 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync
{
 		}
 	}
 
+	public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
+		final DispatcherGateway dispatcherGateway;
+		try {
+			dispatcherGateway = getDispatcherGateway();
+		} catch (LeaderRetrievalException | InterruptedException e) {
+			ExceptionUtils.checkInterrupted(e);
+			return FutureUtils.completedExceptionally(e);
+		}
+
+		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
+		// from the ResourceManager
+		jobGraph.setAllowQueuedScheduling(true);
+
+		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph,
rpcTimeout);
+
+		return acknowledgeCompletableFuture.thenApply(
+			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
+	}
+
+	public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
+		final DispatcherGateway dispatcherGateway;
+		try {
+			dispatcherGateway = getDispatcherGateway();
+		} catch (LeaderRetrievalException | InterruptedException e) {
+			ExceptionUtils.checkInterrupted(e);
+			return FutureUtils.completedExceptionally(e);
+		}
+
+		return dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT);
+	}
+
 	private DispatcherGateway getDispatcherGateway() throws LeaderRetrievalException, InterruptedException
{
 		synchronized (lock) {
 			checkState(running, "MiniCluster is not yet running.");


Mime
View raw message