flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [4/5] flink git commit: [FLINK-7780] [Client] Move savepoint logic into ClusterClient
Date Wed, 11 Oct 2017 11:53:57 GMT
[FLINK-7780] [Client] Move savepoint logic into ClusterClient


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8e1e330
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8e1e330
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8e1e330

Branch: refs/heads/master
Commit: e8e1e330a62bcdad939c896ab807362cc346278b
Parents: 90eb902
Author: zentol <chesnay@apache.org>
Authored: Mon Oct 9 13:34:52 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Wed Oct 11 11:58:54 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  49 ++++----
 .../flink/client/program/ClusterClient.java     |  34 +++++
 .../flink/client/CliFrontendSavepointTest.java  | 126 ++++++-------------
 .../flink/client/program/ClusterClientTest.java |  58 ++++++++-
 4 files changed, 144 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 9be8295..c065453 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -61,11 +61,11 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -89,16 +89,15 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
-import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
 
 /**
  * Implementation of a simple command line frontend for executing programs.
@@ -726,35 +725,29 @@ public class CliFrontend {
 	 */
 	private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory)
{
 		try {
-			ActorGateway jobManager = getJobManagerGateway(options);
-
-			logAndSysout("Triggering savepoint for job " + jobId + ".");
-			Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
-					new FiniteDuration(1, TimeUnit.HOURS));
-
-			Object result;
+			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
+			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config,
configurationDirectory);
 			try {
-				logAndSysout("Waiting for response...");
-				result = Await.result(response, FiniteDuration.Inf());
-			}
-			catch (Exception e) {
-				throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
-			}
+				logAndSysout("Triggering savepoint for job " + jobId + ".");
+				CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId,
savepointDirectory);
 
-			if (result instanceof TriggerSavepointSuccess) {
-				TriggerSavepointSuccess success = (TriggerSavepointSuccess) result;
-				logAndSysout("Savepoint completed. Path: " + success.savepointPath());
+				String savepointPath;
+				try {
+					logAndSysout("Waiting for response...");
+					savepointPath = savepointPathFuture.get();
+				}
+				catch (ExecutionException ee) {
+					Throwable cause = ExceptionUtils.stripExecutionException(ee);
+					throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.",
cause);
+				}
+
+				logAndSysout("Savepoint completed. Path: " + savepointPath);
 				logAndSysout("You can resume your program from this savepoint with the run command.");
 
 				return 0;
 			}
-			else if (result instanceof TriggerSavepointFailure) {
-				TriggerSavepointFailure failure = (TriggerSavepointFailure) result;
-				throw failure.cause();
-			}
-			else {
-				throw new IllegalStateException("Unknown JobManager response of type " +
-						result.getClass());
+			finally {
+				client.shutdown();
 			}
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 78455c1..eb89f09 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -72,6 +73,9 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 
 import scala.Option;
 import scala.Tuple2;
@@ -650,6 +654,36 @@ public abstract class ClusterClient {
 	}
 
 	/**
+	 * Triggers a savepoint for the job identified by the job id. The savepoint will be written
to the given savepoint
+	 * directory, or {@link org.apache.flink.configuration.CoreOptions#SAVEPOINT_DIRECTORY}
if it is null.
+	 *
+	 * @param jobId job id
+	 * @param savepointDirectory directory the savepoint should be written to
+	 * @return path future where the savepoint is located
+	 * @throws Exception if  no connection to the cluster could be established
+	 */
+	public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory)
throws Exception {
+		final ActorGateway jobManager = getJobManagerGateway();
+
+		Future<Object> response = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId,
Option.apply(savepointDirectory)),
+			new FiniteDuration(1, TimeUnit.HOURS));
+		CompletableFuture<Object> responseFuture = FutureUtils.toJava(response);
+
+		return responseFuture.thenApply((responseMessage) -> {
+			if (responseMessage instanceof JobManagerMessages.TriggerSavepointSuccess) {
+				JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess)
responseMessage;
+				return success.savepointPath();
+			} else if (responseMessage instanceof JobManagerMessages.TriggerSavepointFailure) {
+				JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure)
responseMessage;
+				throw new CompletionException(failure.cause());
+			} else {
+				throw new CompletionException(
+					new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
+			}
+		});
+	}
+
+	/**
 	 * Requests and returns the accumulators for the given job identifier. Accumulators can
be
 	 * requested while a is running or after it has finished. The default class loader is used
 	 * to deserialize the incoming accumulator results.

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
index cfed859..1f0d356 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.util.MockedCliFrontend;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
@@ -33,22 +35,23 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.util.concurrent.CompletableFuture;
 import java.util.zip.ZipOutputStream;
 
-import scala.Option;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -76,31 +79,19 @@ public class CliFrontendSavepointTest {
 
 		try {
 			JobID jobId = new JobID();
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-			when(jobManager.ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
 
 			String savepointPath = "expectedSavepointPath";
 
-			triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));
-
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
+			MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath);
 
 			String[] parameters = { jobId.toString() };
 			int returnCode = frontend.savepoint(parameters);
 
 			assertEquals(0, returnCode);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class));
+			verify(frontend.client, times(1))
+				.triggerSavepoint(eq(jobId), isNull(String.class));
 
-			assertTrue(buffer.toString().contains("expectedSavepointPath"));
+			assertTrue(buffer.toString().contains(savepointPath));
 		}
 		finally {
 			restoreStdOutAndStdErr();
@@ -113,29 +104,17 @@ public class CliFrontendSavepointTest {
 
 		try {
 			JobID jobId = new JobID();
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-			when(jobManager.ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
 
 			Exception testException = new Exception("expectedTestException");
 
-			triggerResponse.success(new TriggerSavepointFailure(jobId, testException));
-
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
+			MockedCliFrontend frontend = new SavepointTestCliFrontend(testException);
 
 			String[] parameters = { jobId.toString() };
 			int returnCode = frontend.savepoint(parameters);
 
-			assertTrue(returnCode != 0);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class));
+			assertNotEquals(0, returnCode);
+			verify(frontend.client, times(1))
+				.triggerSavepoint(eq(jobId), isNull(String.class));
 
 			assertTrue(buffer.toString().contains("expectedTestException"));
 		}
@@ -162,46 +141,9 @@ public class CliFrontendSavepointTest {
 		}
 	}
 
-	@Test
-	public void testTriggerSavepointFailureUnknownResponse() throws Exception {
-		replaceStdOutAndStdErr();
-
-		try {
-			JobID jobId = new JobID();
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-			when(jobManager.ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
-
-			triggerResponse.success("UNKNOWN RESPONSE");
-
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
-
-			String[] parameters = { jobId.toString() };
-			int returnCode = frontend.savepoint(parameters);
-
-			assertTrue(returnCode != 0);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class));
-
-			String errMsg = buffer.toString();
-			assertTrue(errMsg.contains("IllegalStateException"));
-			assertTrue(errMsg.contains("Unknown JobManager response"));
-		}
-		finally {
-			restoreStdOutAndStdErr();
-		}
-	}
-
 	/**
 	 * Tests that a CLI call with a custom savepoint directory target is
-	 * forwarded correctly to the JM.
+	 * forwarded correctly to the cluster client.
 	 */
 	@Test
 	public void testTriggerSavepointCustomTarget() throws Exception {
@@ -209,30 +151,19 @@ public class CliFrontendSavepointTest {
 
 		try {
 			JobID jobId = new JobID();
-			Option<String> customTarget = Option.apply("customTargetDirectory");
-			ActorGateway jobManager = mock(ActorGateway.class);
 
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+			String savepointDirectory = "customTargetDirectory";
 
-			when(jobManager.ask(
-					Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
-			String savepointPath = "expectedSavepointPath";
-			triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));
+			MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory);
 
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
-
-			String[] parameters = { jobId.toString(), customTarget.get() };
+			String[] parameters = { jobId.toString(), savepointDirectory };
 			int returnCode = frontend.savepoint(parameters);
 
 			assertEquals(0, returnCode);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
-					any(FiniteDuration.class));
+			verify(frontend.client, times(1))
+				.triggerSavepoint(eq(jobId), eq(savepointDirectory));
 
-			assertTrue(buffer.toString().contains("expectedSavepointPath"));
+			assertTrue(buffer.toString().contains(savepointDirectory));
 		}
 		finally {
 			restoreStdOutAndStdErr();
@@ -444,4 +375,17 @@ public class CliFrontendSavepointTest {
 		System.setOut(stdOut);
 		System.setErr(stdErr);
 	}
+
+	private static final class SavepointTestCliFrontend extends MockedCliFrontend {
+
+		SavepointTestCliFrontend(String expectedResponse) throws Exception {
+			when(client.triggerSavepoint(any(JobID.class), anyString()))
+				.thenReturn(CompletableFuture.completedFuture(expectedResponse));
+		}
+
+		SavepointTestCliFrontend(Exception expectedException) throws Exception {
+			when(client.triggerSavepoint(any(JobID.class), anyString()))
+				.thenReturn(FutureUtils.completedExceptionally(expectedException));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index ad34864..5f6d9fe 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
+
 import scala.concurrent.Future;
 import scala.concurrent.Future$;
 import scala.concurrent.duration.FiniteDuration;
@@ -100,12 +102,33 @@ public class ClusterClientTest extends TestLogger {
 		config.setString(JobManagerOptions.ADDRESS, "localhost");
 
 		JobID jobID = new JobID();
+		String savepointDirectory = "/test/directory";
+		String savepointPath = "/test/path";
+		TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID,
savepointDirectory, savepointPath);
+		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		try {
+			String path = clusterClient.cancelWithSavepoint(jobID, savepointDirectory);
+			Assert.assertTrue(gateway.messageArrived);
+			Assert.assertEquals(savepointPath, path);
+		} finally {
+			clusterClient.shutdown();
+		}
+	}
+
+	@Test
+	public void testClusterClientSavepoint() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		JobID jobID = new JobID();
+		String savepointDirectory = "/test/directory";
 		String savepointPath = "/test/path";
-		TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID,
savepointPath);
+		TestSavepointActorGateway gateway = new TestSavepointActorGateway(jobID, savepointDirectory,
savepointPath);
 		ClusterClient clusterClient = new TestClusterClient(config, gateway);
 		try {
-			clusterClient.cancelWithSavepoint(jobID, savepointPath);
+			CompletableFuture<String> pathFuture = clusterClient.triggerSavepoint(jobID, savepointDirectory);
 			Assert.assertTrue(gateway.messageArrived);
+			Assert.assertEquals(savepointPath, pathFuture.get());
 		} finally {
 			clusterClient.shutdown();
 		}
@@ -153,18 +176,45 @@ public class ClusterClientTest extends TestLogger {
 
 		private final JobID expectedJobID;
 		private final String expectedTargetDirectory;
+		private final String savepointPathToReturn;
 
-		TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory)
{
+		TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory,
String savepointPathToReturn) {
 			super(JobManagerMessages.CancelJobWithSavepoint.class);
 			this.expectedJobID = expectedJobID;
 			this.expectedTargetDirectory = expectedTargetDirectory;
+			this.savepointPathToReturn = savepointPathToReturn;
 		}
 
 		@Override
 		public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint
message) {
 			Assert.assertEquals(expectedJobID, message.jobID());
 			Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory());
-			return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
+			return new JobManagerMessages.CancellationSuccess(message.jobID(), savepointPathToReturn);
+		}
+	}
+
+	private static class TestSavepointActorGateway extends TestActorGateway<JobManagerMessages.TriggerSavepoint,
JobManagerMessages.TriggerSavepointSuccess> {
+
+		private final JobID expectedJobID;
+		private final String expectedTargetDirectory;
+		private final String savepointPathToReturn;
+
+		private TestSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory,
String savepointPathToReturn) {
+			super(JobManagerMessages.TriggerSavepoint.class);
+			this.expectedJobID = expectedJobID;
+			this.expectedTargetDirectory = expectedTargetDirectory;
+			this.savepointPathToReturn = savepointPathToReturn;
+		}
+
+		@Override
+		public JobManagerMessages.TriggerSavepointSuccess process(JobManagerMessages.TriggerSavepoint
message) {
+			Assert.assertEquals(expectedJobID, message.jobId());
+			if (expectedTargetDirectory == null) {
+				Assert.assertTrue(message.savepointDirectory().isEmpty());
+			} else {
+				Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory().get());
+			}
+			return new JobManagerMessages.TriggerSavepointSuccess(message.jobId(), 0, savepointPathToReturn,
0);
 		}
 	}
 


Mime
View raw message