flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [17/22] flink git commit: [FLINK-8530] [flip6] Enable detached job mode submission to session cluster
Date Thu, 15 Feb 2018 10:43:00 GMT
[FLINK-8530] [flip6] Enable detached job mode submission to session cluster

This commit makes the RestClusterClient aware whether the user wishes to submit
a job in detached or non-detached mode. If it is detached, then the RestClusterClient
won't poll for the execution result.

This closes #5466.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f57b9833
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f57b9833
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f57b9833

Branch: refs/heads/master
Commit: f57b9833aaf7649f79b72efd4a033fbfb4f8e218
Parents: 39df56d
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Feb 12 16:33:15 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 15 08:45:18 2018 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 67 +++++++++++---------
 .../program/rest/RestClusterClientTest.java     | 49 ++++++++++----
 2 files changed, 74 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f57b9833/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 18cd5c0..2f53545 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -96,7 +96,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -197,40 +196,46 @@ public class RestClusterClient<T> extends ClusterClient<T>
{
 
 	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws
ProgramInvocationException {
-		log.info("Submitting job.");
-		try {
-			submitJob(jobGraph).get();
-		} catch (InterruptedException | ExecutionException e) {
-			throw new ProgramInvocationException(ExceptionUtils.stripExecutionException(e));
-		}
+		log.info("Submitting job {}.", jobGraph.getJobID());
 
-		final CompletableFuture<JobResult> jobResultFuture = requestJobResult(jobGraph.getJobID());
+		final CompletableFuture<JobSubmitResponseBody> jobSubmissionFuture = submitJob(jobGraph);
 
-		final JobResult jobResult;
-		try {
-			jobResult = jobResultFuture.get();
-		} catch (Exception e) {
-			throw new ProgramInvocationException(e);
-		}
+		if (isDetached()) {
+			try {
+				jobSubmissionFuture.get();
+			} catch (Exception e) {
+				throw new ProgramInvocationException("Could not submit job " + jobGraph.getJobID() +
'.', ExceptionUtils.stripExecutionException(e));
+			}
 
-		if (jobResult.getSerializedThrowable().isPresent()) {
-			final SerializedThrowable serializedThrowable = jobResult.getSerializedThrowable().get();
-			final Throwable throwable = serializedThrowable.deserializeError(classLoader);
-			throw new ProgramInvocationException(throwable);
-		}
+			return new JobSubmissionResult(jobGraph.getJobID());
+		} else {
+			final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
+				ignored -> requestJobResult(jobGraph.getJobID()));
 
-		try {
-			// don't return just a JobSubmissionResult here, the signature is lying
-			// The CliFrontend expects this to be a JobExecutionResult
-			this.lastJobExecutionResult = new JobExecutionResult(
-				jobResult.getJobId(),
-				jobResult.getNetRuntime(),
-				AccumulatorHelper.deserializeAccumulators(
-					jobResult.getAccumulatorResults(),
-					classLoader));
-			return lastJobExecutionResult;
-		} catch (IOException | ClassNotFoundException e) {
-			throw new ProgramInvocationException(e);
+			final JobResult jobResult;
+			try {
+				jobResult = jobResultFuture.get();
+			} catch (Exception e) {
+				throw new ProgramInvocationException("Could not retrieve the execution result.", ExceptionUtils.stripExecutionException(e));
+			}
+
+			if (jobResult.getSerializedThrowable().isPresent()) {
+				final SerializedThrowable serializedThrowable = jobResult.getSerializedThrowable().get();
+				final Throwable throwable = serializedThrowable.deserializeError(classLoader);
+				throw new ProgramInvocationException(throwable);
+			}
+
+			try {
+				this.lastJobExecutionResult = new JobExecutionResult(
+					jobResult.getJobId(),
+					jobResult.getNetRuntime(),
+					AccumulatorHelper.deserializeAccumulators(
+						jobResult.getAccumulatorResults(),
+						classLoader));
+				return lastJobExecutionResult;
+			} catch (IOException | ClassNotFoundException e) {
+				throw new ProgramInvocationException(e);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f57b9833/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index c880817..9e75eea 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program.rest;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.deployment.StandaloneClusterId;
@@ -115,6 +116,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -147,6 +150,9 @@ public class RestClusterClientTest extends TestLogger {
 
 	private ExecutorService executor;
 
+	private JobGraph jobGraph;
+	private JobID jobId;
+
 	@Before
 	public void setUp() throws Exception {
 		MockitoAnnotations.initMocks(this);
@@ -178,6 +184,9 @@ public class RestClusterClientTest extends TestLogger {
 			}
 		};
 		restClusterClient = new RestClusterClient<>(config, restClient, StandaloneClusterId.getInstance(),
(attempt) -> 0);
+
+		jobGraph = new JobGraph("testjob");
+		jobId = jobGraph.getJobID();
 	}
 
 	@After
@@ -193,16 +202,13 @@ public class RestClusterClientTest extends TestLogger {
 
 	@Test
 	public void testJobSubmitCancelStop() throws Exception {
-		final JobGraph job = new JobGraph("testjob");
-		final JobID id = job.getJobID();
-
 		TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler();
 		TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
 		TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
 		TestJobExecutionResultHandler testJobExecutionResultHandler =
 			new TestJobExecutionResultHandler(
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
-					.jobId(id)
+					.jobId(jobId)
 					.netRuntime(Long.MAX_VALUE)
 					.build()));
 
@@ -214,20 +220,44 @@ public class RestClusterClientTest extends TestLogger {
 
 			Assert.assertFalse(portHandler.portRetrieved);
 			Assert.assertFalse(submitHandler.jobSubmitted);
-			restClusterClient.submitJob(job, ClassLoader.getSystemClassLoader());
+			restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
 			Assert.assertTrue(portHandler.portRetrieved);
 			Assert.assertTrue(submitHandler.jobSubmitted);
 
 			Assert.assertFalse(terminationHandler.jobCanceled);
-			restClusterClient.cancel(id);
+			restClusterClient.cancel(jobId);
 			Assert.assertTrue(terminationHandler.jobCanceled);
 
 			Assert.assertFalse(terminationHandler.jobStopped);
-			restClusterClient.stop(id);
+			restClusterClient.stop(jobId);
 			Assert.assertTrue(terminationHandler.jobStopped);
 		}
 	}
 
+	/**
+	 * Tests that we can submit a jobGraph in detached mode.
+	 */
+	@Test
+	public void testDetachedJobSubmission() throws Exception {
+
+		final TestBlobServerPortHandler testBlobServerPortHandler = new TestBlobServerPortHandler();
+		final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
+
+		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+			testBlobServerPortHandler,
+			testJobSubmitHandler)) {
+
+			restClusterClient.setDetached(true);
+			final JobSubmissionResult jobSubmissionResult = restClusterClient.submitJob(jobGraph,
ClassLoader.getSystemClassLoader());
+
+			// if the detached mode didn't work, then we would not reach this point because the execution
result
+			// retrieval would have failed.
+			assertThat(jobSubmissionResult, is(not(instanceOf(JobExecutionResult.class))));
+			assertThat(jobSubmissionResult.getJobID(), is(jobId));
+		}
+
+	}
+
 	private class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody,
EmptyMessageParameters> {
 		private volatile boolean portRetrieved = false;
 
@@ -314,9 +344,6 @@ public class RestClusterClientTest extends TestLogger {
 
 	@Test
 	public void testSubmitJobAndWaitForExecutionResult() throws Exception {
-		final JobGraph jobGraph = new JobGraph("testjob");
-		final JobID jobId = jobGraph.getJobID();
-
 		final TestJobExecutionResultHandler testJobExecutionResultHandler =
 			new TestJobExecutionResultHandler(
 				new RestHandlerException("should trigger retry", HttpResponseStatus.NOT_FOUND),
@@ -509,7 +536,7 @@ public class RestClusterClientTest extends TestLogger {
 				Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
 				JobStatusMessage job1 = jobDetailsIterator.next();
 				JobStatusMessage job2 = jobDetailsIterator.next();
-				Assert.assertNotEquals("The job statues should not be equal.", job1.getJobState(), job2.getJobState());
+				Assert.assertNotEquals("The job status should not be equal.", job1.getJobState(), job2.getJobState());
 			}
 		}
 	}


Mime
View raw message