flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [07/14] flink git commit: [FLINK-8347] [flip6] Make cluster id used by ClusterDescriptor typesafe
Date Fri, 12 Jan 2018 22:25:28 GMT
[FLINK-8347] [flip6] Make cluster id used by ClusterDescriptor typesafe

The ClusterDescriptor uses a typed cluster id for the ClusterClient retrieval.
Moreover, the ClusterClient and the CustomCommandLine are typed accordingly.

This closes #5232.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38d37208
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38d37208
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38d37208

Branch: refs/heads/master
Commit: 38d3720863c6187153174d0df57fc414b0cf8e96
Parents: 2ce64e7
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Jan 11 23:46:33 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Jan 12 16:14:04 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/RemoteExecutor.java |   2 +-
 .../client/cli/AbstractCustomCommandLine.java   |   2 +-
 .../apache/flink/client/cli/CliFrontend.java    | 462 ++++++++++---------
 .../flink/client/cli/CliFrontendParser.java     |  14 +-
 .../flink/client/cli/CustomCommandLine.java     |   6 +-
 .../org/apache/flink/client/cli/DefaultCLI.java |  10 +-
 .../flink/client/cli/Flip6DefaultCLI.java       |  10 +-
 .../client/deployment/ClusterDescriptor.java    |  20 +-
 .../deployment/ClusterRetrieveException.java    |  41 ++
 .../Flip6StandaloneClusterDescriptor.java       |  16 +-
 .../deployment/StandaloneClusterDescriptor.java |  12 +-
 .../client/deployment/StandaloneClusterId.java  |  32 ++
 .../flink/client/program/ClusterClient.java     |  10 +-
 .../client/program/ContextEnvironment.java      |   6 +-
 .../program/ContextEnvironmentFactory.java      |   4 +-
 .../client/program/DetachedEnvironment.java     |   2 +-
 .../client/program/StandaloneClusterClient.java |   8 +-
 .../client/program/rest/RestClusterClient.java  |  27 +-
 .../flink/client/cli/CliFrontendCancelTest.java |  10 +-
 .../flink/client/cli/CliFrontendListTest.java   |   6 +-
 .../client/cli/CliFrontendSavepointTest.java    |  18 +-
 .../flink/client/cli/CliFrontendStopTest.java   |  48 +-
 .../apache/flink/client/cli/DefaultCLITest.java |  10 +-
 .../client/cli/util/DummyClusterDescriptor.java |  16 +-
 .../client/cli/util/DummyCustomCommandLine.java |  10 +-
 .../client/program/ClientConnectionTest.java    |   4 +-
 .../apache/flink/client/program/ClientTest.java |   8 +-
 .../flink/client/program/ClusterClientTest.java |  16 +-
 .../program/rest/RestClusterClientTest.java     |   5 +-
 .../org/apache/flink/storm/api/FlinkClient.java |   5 +-
 .../org/apache/flink/runtime/rpc/RpcServer.java |   2 +-
 .../org/apache/flink/api/scala/FlinkShell.scala |   7 +-
 .../environment/RemoteStreamEnvironment.java    |   3 +-
 .../test/example/client/JobRetrievalITCase.java |   5 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   5 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  35 +-
 .../apache/flink/yarn/YarnClusterClient.java    |  13 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  28 +-
 .../flink/yarn/AbstractYarnClusterTest.java     |  31 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  12 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |   9 +-
 42 files changed, 528 insertions(+), 464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 1ae9b07..fcf8bab 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -58,7 +58,7 @@ public class RemoteExecutor extends PlanExecutor {
 
 	private final Configuration clientConfiguration;
 
-	private ClusterClient client;
+	private ClusterClient<?> client;
 
 	private int defaultParallelism = 1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index da21556..59046bf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -38,7 +38,7 @@ import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConf
  * a ZooKeeper namespace.
  *
  */
-public abstract class AbstractCustomCommandLine implements CustomCommandLine {
+public abstract class AbstractCustomCommandLine<T> implements CustomCommandLine<T> {
 
 	protected final Option zookeeperNamespaceOption = new Option("z", "zookeeperNamespace", true,
 		"Namespace to create the Zookeeper sub-paths for high availability mode");

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 847a5f8..d661aa9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -76,9 +76,7 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -106,7 +104,7 @@ public class CliFrontend {
 
 	private final Configuration configuration;
 
-	private final List<CustomCommandLine> customCommandLines;
+	private final List<CustomCommandLine<?>> customCommandLines;
 
 	private final Options customCommandLineOptions;
 
@@ -116,7 +114,7 @@ public class CliFrontend {
 
 	public CliFrontend(
 			Configuration configuration,
-			List<CustomCommandLine> customCommandLines) throws Exception {
+			List<CustomCommandLine<?>> customCommandLines) throws Exception {
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.customCommandLines = Preconditions.checkNotNull(customCommandLines);
 
@@ -129,7 +127,7 @@ public class CliFrontend {
 
 		this.customCommandLineOptions = new Options();
 
-		for (CustomCommandLine customCommandLine : customCommandLines) {
+		for (CustomCommandLine<?> customCommandLine : customCommandLines) {
 			customCommandLine.addGeneralOptions(customCommandLineOptions);
 			customCommandLine.addRunOptions(customCommandLineOptions);
 		}
@@ -196,14 +194,22 @@ public class CliFrontend {
 			throw new CliArgsException("Could not build the program from JAR file.", e);
 		}
 
-		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
 
-		final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
+		runProgram(customCommandLine, commandLine, runOptions, program);
+	}
+
+	private <T> void runProgram(
+			CustomCommandLine<T> customCommandLine,
+			CommandLine commandLine,
+			RunOptions runOptions,
+			PackagedProgram program) throws ProgramInvocationException, FlinkException {
+		final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
 
 		try {
-			final String clusterId = customCommandLine.getClusterId(commandLine);
+			final T clusterId = customCommandLine.getClusterId(commandLine);
 
-			final ClusterClient client;
+			final ClusterClient<T> client;
 
 			if (clusterId != null) {
 				client = clusterDescriptor.retrieve(clusterId);
@@ -235,7 +241,7 @@ public class CliFrontend {
 				if (clusterId == null && !client.isDetached()) {
 					// terminate the cluster only if we have started it before and if it's not detached
 					try {
-						clusterDescriptor.terminateCluster(client.getClusterIdentifier());
+						clusterDescriptor.terminateCluster(client.getClusterId());
 					} catch (FlinkException e) {
 						LOG.info("Could not properly terminate the Flink cluster.", e);
 					}
@@ -351,100 +357,86 @@ public class CliFrontend {
 			return;
 		}
 
-		boolean running = listOptions.getRunning();
-		boolean scheduled = listOptions.getScheduled();
+		final boolean running;
+		final boolean scheduled;
 
 		// print running and scheduled jobs if not option supplied
-		if (!running && !scheduled) {
+		if (!listOptions.getRunning() && !listOptions.getScheduled()) {
 			running = true;
 			scheduled = true;
+		} else {
+			running = listOptions.getRunning();
+			scheduled = listOptions.getScheduled();
 		}
 
-		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
-		final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
 
-		final String clusterId = activeCommandLine.getClusterId(commandLine);
-
-		if (clusterId == null) {
-			throw new FlinkException("No cluster id was specified. Please specify a cluster to which " +
-				"you would like to connect.");
-		}
+		runClusterAction(
+			activeCommandLine,
+			commandLine,
+			clusterClient -> listJobs(clusterClient, running, scheduled));
 
-		final ClusterClient client = clusterDescriptor.retrieve(clusterId);
+	}
 
+	private <T> void listJobs(
+			ClusterClient<T> clusterClient,
+			boolean running,
+			boolean scheduled) throws FlinkException {
+		Collection<JobStatusMessage> jobDetails;
 		try {
-			Collection<JobStatusMessage> jobDetails;
-			try {
-				CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = client.listJobs();
+			CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
 
-				try {
-					logAndSysout("Waiting for response...");
-					jobDetails = jobDetailsFuture.get();
-				}
-				catch (ExecutionException ee) {
-					Throwable cause = ExceptionUtils.stripExecutionException(ee);
-					throw new Exception("Failed to retrieve job list.", cause);
-				}
-			} finally {
-				client.shutdown();
-			}
+			logAndSysout("Waiting for response...");
+			jobDetails = jobDetailsFuture.get();
 
-			LOG.info("Successfully retrieved list of jobs");
+		} catch (Exception e) {
+			Throwable cause = ExceptionUtils.stripExecutionException(e);
+			throw new FlinkException("Failed to retrieve job list.", cause);
+		}
 
-			SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
-			Comparator<JobStatusMessage> startTimeComparator = (o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime());
+		LOG.info("Successfully retrieved list of jobs");
 
-			final List<JobStatusMessage> runningJobs = new ArrayList<>();
-			final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
-			jobDetails.forEach(details -> {
-				if (details.getJobState() == JobStatus.CREATED) {
-					scheduledJobs.add(details);
-				} else {
-					runningJobs.add(details);
-				}
-			});
+		SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
+		Comparator<JobStatusMessage> startTimeComparator = (o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime());
 
-			if (running) {
-				if (runningJobs.size() == 0) {
-					System.out.println("No running jobs.");
-				}
-				else {
-					runningJobs.sort(startTimeComparator);
+		final List<JobStatusMessage> runningJobs = new ArrayList<>();
+		final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
+		jobDetails.forEach(details -> {
+			if (details.getJobState() == JobStatus.CREATED) {
+				scheduledJobs.add(details);
+			} else {
+				runningJobs.add(details);
+			}
+		});
 
-					System.out.println("------------------ Running/Restarting Jobs -------------------");
-					for (JobStatusMessage runningJob : runningJobs) {
-						System.out.println(dateFormat.format(new Date(runningJob.getStartTime()))
-							+ " : " + runningJob.getJobId() + " : " + runningJob.getJobName() + " (" + runningJob.getJobState() + ")");
-					}
-					System.out.println("--------------------------------------------------------------");
-				}
+		if (running) {
+			if (runningJobs.size() == 0) {
+				System.out.println("No running jobs.");
 			}
-			if (scheduled) {
-				if (scheduledJobs.size() == 0) {
-					System.out.println("No scheduled jobs.");
-				}
-				else {
-					scheduledJobs.sort(startTimeComparator);
+			else {
+				runningJobs.sort(startTimeComparator);
 
-					System.out.println("----------------------- Scheduled Jobs -----------------------");
-					for (JobStatusMessage scheduledJob : scheduledJobs) {
-						System.out.println(dateFormat.format(new Date(scheduledJob.getStartTime()))
-							+ " : " + scheduledJob.getJobId() + " : " + scheduledJob.getJobName());
-					}
-					System.out.println("--------------------------------------------------------------");
+				System.out.println("------------------ Running/Restarting Jobs -------------------");
+				for (JobStatusMessage runningJob : runningJobs) {
+					System.out.println(dateFormat.format(new Date(runningJob.getStartTime()))
+						+ " : " + runningJob.getJobId() + " : " + runningJob.getJobName() + " (" + runningJob.getJobState() + ")");
 				}
+				System.out.println("--------------------------------------------------------------");
 			}
-		} finally {
-			try {
-				client.shutdown();
-			} catch (Exception e) {
-				LOG.info("Could not properly shut down the client.", e);
+		}
+		if (scheduled) {
+			if (scheduledJobs.size() == 0) {
+				System.out.println("No scheduled jobs.");
 			}
+			else {
+				scheduledJobs.sort(startTimeComparator);
 
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
+				System.out.println("----------------------- Scheduled Jobs -----------------------");
+				for (JobStatusMessage scheduledJob : scheduledJobs) {
+					System.out.println(dateFormat.format(new Date(scheduledJob.getStartTime()))
+						+ " : " + scheduledJob.getJobId() + " : " + scheduledJob.getJobName());
+				}
+				System.out.println("--------------------------------------------------------------");
 			}
 		}
 	}
@@ -477,41 +469,26 @@ public class CliFrontend {
 		if (stopArgs.length > 0) {
 			String jobIdString = stopArgs[0];
 			jobId = parseJobId(jobIdString);
-		}
-		else {
+		} else {
 			throw new CliArgsException("Missing JobID");
 		}
 
-		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
-
-		final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
 
-		final String clusterId = activeCommandLine.getClusterId(commandLine);
-
-		if (clusterId == null) {
-			throw new FlinkException("No cluster id was specified. Please specify a cluster to which " +
-				"you would like to connect.");
-		}
+		logAndSysout("Stopping job " + jobId + '.');
 
-		final ClusterClient client = clusterDescriptor.retrieve(clusterId);
-
-		try {
-			logAndSysout("Stopping job " + jobId + '.');
-			client.stop(jobId);
-			logAndSysout("Stopped job " + jobId + '.');
-		} finally {
-			try {
-				client.shutdown();
-			} catch (Exception e) {
-				LOG.info("Could not properly shut down the client.", e);
-			}
+		runClusterAction(
+			activeCommandLine,
+			commandLine,
+			clusterClient -> {
+				try {
+					clusterClient.stop(jobId);
+				} catch (Exception e) {
+					throw new FlinkException("Could not stop the job " + jobId + '.', e);
+				}
+			});
 
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
-			}
-		}
+		logAndSysout("Stopped job " + jobId + '.');
 	}
 
 	/**
@@ -536,71 +513,63 @@ public class CliFrontend {
 			return;
 		}
 
-		String[] cleanedArgs = cancelOptions.getArgs();
+		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
 
-		boolean withSavepoint = cancelOptions.isWithSavepoint();
-		String targetDirectory = cancelOptions.getSavepointTargetDirectory();
+		final String[] cleanedArgs = cancelOptions.getArgs();
 
-		JobID jobId;
+		if (cancelOptions.isWithSavepoint()) {
+			final JobID jobId;
+			final String targetDirectory;
 
-		// Figure out jobID. This is a little overly complicated, because
-		// we have to figure out whether the optional target directory
-		// is set:
-		// - cancel -s <jobID> => default target dir (JobID parsed as opt arg)
-		// - cancel -s <targetDir> <jobID> => custom target dir (parsed correctly)
-		if (cleanedArgs.length > 0) {
-			String jobIdString = cleanedArgs[0];
+			if (cleanedArgs.length > 0) {
+				jobId = parseJobId(cleanedArgs[0]);
+				targetDirectory = cancelOptions.getSavepointTargetDirectory();
+			} else {
+				jobId = parseJobId(cancelOptions.getSavepointTargetDirectory());
+				targetDirectory = null;
+			}
 
-			jobId = parseJobId(jobIdString);
-		} else if (targetDirectory != null)  {
-			// Try this for case: cancel -s <jobID> (default savepoint target dir)
-			String jobIdString = targetDirectory;
-			targetDirectory = null;
+			if (targetDirectory == null) {
+				logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory.");
+			} else {
+				logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.');
+			}
 
-			jobId = parseJobId(jobIdString);
+			runClusterAction(
+				activeCommandLine,
+				commandLine,
+				clusterClient -> {
+					final String savepointPath;
+					try {
+						savepointPath = clusterClient.cancelWithSavepoint(jobId, targetDirectory);
+					} catch (Exception e) {
+						throw new FlinkException("Could not cancel job " + jobId + '.', e);
+					}
+					logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.');
+				});
 		} else {
-			throw new CliArgsException("Missing JobID in the command line arguments.");
-		}
-
-		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
-
-		final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+			final JobID jobId;
 
-		final String clusterId = activeCommandLine.getClusterId(commandLine);
-
-		if (clusterId == null) {
-			throw new FlinkException("No cluster id was specified. Please specify a cluster to which " +
-				"you would like to connect.");
-		}
-
-		final ClusterClient client = clusterDescriptor.retrieve(clusterId);
-
-		try {
-			if (withSavepoint) {
-				if (targetDirectory == null) {
-					logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory.");
-				} else {
-					logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.');
-				}
-				String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory);
-				logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.');
+			if (cleanedArgs.length > 0) {
+				jobId = parseJobId(cleanedArgs[0]);
 			} else {
-				logAndSysout("Cancelling job " + jobId + '.');
-				client.cancel(jobId);
-				logAndSysout("Cancelled job " + jobId + '.');
-			}
-		} finally {
-			try {
-				client.shutdown();
-			} catch (Exception e) {
-				LOG.info("Could not properly shut down the client.", e);
+				throw new CliArgsException("Missing JobID. Specify a JobID to cancel a job.");
 			}
 
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
-			}
+			logAndSysout("Cancelling job " + jobId + '.');
+
+			runClusterAction(
+				activeCommandLine,
+				commandLine,
+				clusterClient -> {
+					try {
+						clusterClient.cancel(jobId);
+					} catch (Exception e) {
+						throw new FlinkException("Could not cancel job " + jobId + '.', e);
+					}
+				});
+
+			logAndSysout("Cancelled job " + jobId + '.');
 		}
 	}
 
@@ -626,69 +595,52 @@ public class CliFrontend {
 			return;
 		}
 
-		CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
 
-		final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
-
-		final String clusterId = customCommandLine.getClusterId(commandLine);
+		if (savepointOptions.isDispose()) {
+			runClusterAction(
+				activeCommandLine,
+				commandLine,
+				clusterClient -> disposeSavepoint(clusterClient, savepointOptions.getSavepointPath()));
+		} else {
+			String[] cleanedArgs = savepointOptions.getArgs();
 
-		if (clusterId == null) {
-			throw new FlinkException("No cluster id was specified. Please specify a cluster to which " +
-				"you would like to connect.");
-		}
+			final JobID jobId;
 
-		final ClusterClient clusterClient = clusterDescriptor.retrieve(clusterId);
+			if (cleanedArgs.length >= 1) {
+				String jobIdString = cleanedArgs[0];
 
-		try {
-			if (savepointOptions.isDispose()) {
-				// Discard
-				disposeSavepoint(clusterClient, savepointOptions.getSavepointPath());
+				jobId = parseJobId(jobIdString);
 			} else {
-				// Trigger
-				String[] cleanedArgs = savepointOptions.getArgs();
-				JobID jobId;
-
-				if (cleanedArgs.length >= 1) {
-					String jobIdString = cleanedArgs[0];
-
-					jobId = parseJobId(jobIdString);
-				} else {
-					throw new CliArgsException("Error: The value for the Job ID is not a valid ID. " +
-						"Specify a Job ID to trigger a savepoint.");
-				}
-
-				String savepointDirectory = null;
-				if (cleanedArgs.length >= 2) {
-					savepointDirectory = cleanedArgs[1];
-				}
-
-				// Print superfluous arguments
-				if (cleanedArgs.length >= 3) {
-					logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
-				}
-
-				triggerSavepoint(clusterClient, jobId, savepointDirectory);
+				throw new CliArgsException("Missing JobID. " +
+					"Specify a Job ID to trigger a savepoint.");
 			}
-		} finally {
-			try {
-				clusterClient.shutdown();
-			} catch (Exception e) {
-				LOG.info("Could not shutdown the cluster client.", e);
+
+			final String savepointDirectory;
+			if (cleanedArgs.length >= 2) {
+				savepointDirectory = cleanedArgs[1];
+			} else {
+				savepointDirectory = null;
 			}
 
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
+			// Print superfluous arguments
+			if (cleanedArgs.length >= 3) {
+				logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
 			}
+
+			runClusterAction(
+				activeCommandLine,
+				commandLine,
+				clusterClient -> triggerSavepoint(clusterClient, jobId, savepointDirectory));
 		}
+
 	}
 
 	/**
 	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
 	 * message to the job manager.
 	 */
-	private String triggerSavepoint(ClusterClient clusterClient, JobID jobId, String savepointDirectory) throws FlinkException {
+	private String triggerSavepoint(ClusterClient<?> clusterClient, JobID jobId, String savepointDirectory) throws FlinkException {
 		logAndSysout("Triggering savepoint for job " + jobId + '.');
 		CompletableFuture<String> savepointPathFuture = clusterClient.triggerSavepoint(jobId, savepointDirectory);
 
@@ -713,7 +665,7 @@ public class CliFrontend {
 	/**
 	 * Sends a {@link JobManagerMessages.DisposeSavepoint} message to the job manager.
 	 */
-	private void disposeSavepoint(ClusterClient clusterClient, String savepointPath) throws FlinkException {
+	private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPath) throws FlinkException {
 		Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. " +
 			"Usage: bin/flink savepoint -d <savepoint-path>");
 
@@ -736,7 +688,7 @@ public class CliFrontend {
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
 
-	protected void executeProgram(PackagedProgram program, ClusterClient client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
+	protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
 		logAndSysout("Starting execution of program");
 
 		final JobSubmissionResult result = client.run(program, parallelism);
@@ -860,7 +812,7 @@ public class CliFrontend {
 			System.err.println(t.getCause().getMessage());
 			StackTraceElement[] trace = t.getCause().getStackTrace();
 			for (StackTraceElement ele: trace) {
-				System.err.println("\t" + ele.toString());
+				System.err.println("\t" + ele);
 				if (ele.getMethodName().equals("main")) {
 					break;
 				}
@@ -890,6 +842,65 @@ public class CliFrontend {
 		return jobId;
 	}
 
+	/**
+	 * Retrieves the {@link ClusterClient} from the given {@link CustomCommandLine} and runs the given
+	 * {@link ClusterAction} against it.
+	 *
+	 * @param activeCommandLine to create the {@link ClusterDescriptor} from
+	 * @param commandLine containing the parsed command line options
+	 * @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}.
+	 * @param <T> type of the cluster id
+	 * @throws FlinkException if something goes wrong
+	 */
+	private <T> void runClusterAction(CustomCommandLine<T> activeCommandLine, CommandLine commandLine, ClusterAction<T> clusterAction) throws FlinkException {
+		final ClusterDescriptor<T> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+
+		final T clusterId = activeCommandLine.getClusterId(commandLine);
+
+		if (clusterId == null) {
+			throw new FlinkException("No cluster id was specified. Please specify a cluster to which " +
+				"you would like to connect.");
+		} else {
+			try {
+				final ClusterClient<T> clusterClient = clusterDescriptor.retrieve(clusterId);
+
+				try {
+					clusterAction.runAction(clusterClient);
+				} finally {
+					try {
+						clusterClient.shutdown();
+					} catch (Exception e) {
+						LOG.info("Could not properly shut down the cluster client.", e);
+					}
+				}
+			} finally {
+				try {
+					clusterDescriptor.close();
+				} catch (Exception e) {
+					LOG.info("Could not properly close the cluster descriptor.", e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Internal interface to encapsulate cluster actions which are executed via
+	 * the {@link ClusterClient}.
+	 *
+	 * @param <T> tyoe pf the cluster id
+	 */
+	@FunctionalInterface
+	private interface ClusterAction<T> {
+
+		/**
+		 * Run the cluster action with the given {@link ClusterClient}.
+		 *
+		 * @param clusterClient to run the cluster action against
+		 * @throws FlinkException if something goes wrong
+		 */
+		void runAction(ClusterClient<T> clusterClient) throws FlinkException;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Entry point for executable
 	// --------------------------------------------------------------------------------------------
@@ -981,7 +992,7 @@ public class CliFrontend {
 		final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
 
 		// 3. load the custom command lines
-		final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
+		final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
 			configuration,
 			configurationDirectory);
 
@@ -992,12 +1003,7 @@ public class CliFrontend {
 
 			SecurityUtils.install(new SecurityConfiguration(cli.configuration));
 			int retCode = SecurityUtils.getInstalledContext()
-					.runSecured(new Callable<Integer>() {
-						@Override
-						public Integer call() {
-							return cli.parseParameters(args);
-						}
-					});
+					.runSecured(() -> cli.parseParameters(args));
 			System.exit(retCode);
 		}
 		catch (Throwable t) {
@@ -1048,8 +1054,8 @@ public class CliFrontend {
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
 	}
 
-	public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
-		List<CustomCommandLine> customCommandLines = new ArrayList<>(2);
+	public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
+		List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
 
 		//	Command line interface of the YARN session, with a special initialization here
 		//	to prefix all options with y/yarn.
@@ -1082,8 +1088,8 @@ public class CliFrontend {
 	 * @param commandLine The input to the command-line.
 	 * @return custom command-line which is active (may only be one at a time)
 	 */
-	public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
-		for (CustomCommandLine cli : customCommandLines) {
+	public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) {
+		for (CustomCommandLine<?> cli : customCommandLines) {
 			if (cli.isActive(commandLine)) {
 				return cli;
 			}
@@ -1096,7 +1102,7 @@ public class CliFrontend {
 	 * @param className The fully-qualified class name to load.
 	 * @param params The constructor parameters
 	 */
-	private static CustomCommandLine loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
+	private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
 
 		Class<? extends CustomCommandLine> customCliClass =
 			Class.forName(className).asSubclass(CustomCommandLine.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 10507d6..475d854 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -236,7 +236,7 @@ public class CliFrontendParser {
 	/**
 	 * Prints the help for the client.
 	 */
-	public static void printHelp(Collection<CustomCommandLine> customCommandLines) {
+	public static void printHelp(Collection<CustomCommandLine<?>> customCommandLines) {
 		System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
 		System.out.println();
 		System.out.println("The following actions are available:");
@@ -251,7 +251,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForRun(Collection<CustomCommandLine> customCommandLines) {
+	public static void printHelpForRun(Collection<CustomCommandLine<?>> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -279,7 +279,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForList(Collection<CustomCommandLine> customCommandLines) {
+	public static void printHelpForList(Collection<CustomCommandLine<?>> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -294,7 +294,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForStop(Collection<CustomCommandLine> customCommandLines) {
+	public static void printHelpForStop(Collection<CustomCommandLine<?>> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -309,7 +309,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForCancel(Collection<CustomCommandLine> customCommandLines) {
+	public static void printHelpForCancel(Collection<CustomCommandLine<?>> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -324,7 +324,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForSavepoint(Collection<CustomCommandLine> customCommandLines) {
+	public static void printHelpForSavepoint(Collection<CustomCommandLine<?>> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -345,7 +345,7 @@ public class CliFrontendParser {
 	 * @param runOptions True if the run options should be printed, False to print only general options
 	 */
 	private static void printCustomCliOptions(
-			Collection<CustomCommandLine> customCommandLines,
+			Collection<CustomCommandLine<?>> customCommandLines,
 			HelpFormatter formatter,
 			boolean runOptions) {
 		// prints options from all available command-line classes

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index aabc224..e939974 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -30,7 +30,7 @@ import javax.annotation.Nullable;
 /**
  * Custom command-line interface to load hooks for the command-line interface.
  */
-public interface CustomCommandLine {
+public interface CustomCommandLine<T> {
 
 	/**
 	 * Signals whether the custom command-line wants to execute or not.
@@ -66,7 +66,7 @@ public interface CustomCommandLine {
 	 * @return ClusterDescriptor
 	 * @throws FlinkException if the ClusterDescriptor could not be created
 	 */
-	ClusterDescriptor createClusterDescriptor(CommandLine commandLine) throws FlinkException;
+	ClusterDescriptor<T> createClusterDescriptor(CommandLine commandLine) throws FlinkException;
 
 	/**
 	 * Returns the cluster id if a cluster id was specified on the command line, otherwise it
@@ -79,7 +79,7 @@ public interface CustomCommandLine {
 	 * @return Cluster id identifying the cluster to deploy jobs to or null
 	 */
 	@Nullable
-	String getClusterId(CommandLine commandLine);
+	T getClusterId(CommandLine commandLine);
 
 	/**
 	 * Returns the {@link ClusterSpecification} specified by the configuration and the command

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 5660765..d2694b8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.client.cli;
 
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FlinkException;
 
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
 /**
  * The default CLI which is used for interaction with standalone clusters.
  */
-public class DefaultCLI extends AbstractCustomCommandLine {
+public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
 
 	public DefaultCLI(Configuration configuration) {
 		super(configuration);
@@ -49,7 +49,7 @@ public class DefaultCLI extends AbstractCustomCommandLine {
 	}
 
 	@Override
-	public ClusterDescriptor createClusterDescriptor(
+	public StandaloneClusterDescriptor createClusterDescriptor(
 			CommandLine commandLine) throws FlinkException {
 		final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
 
@@ -58,8 +58,8 @@ public class DefaultCLI extends AbstractCustomCommandLine {
 
 	@Override
 	@Nullable
-	public String getClusterId(CommandLine commandLine) {
-		return "standalone";
+	public StandaloneClusterId getClusterId(CommandLine commandLine) {
+		return StandaloneClusterId.getInstance();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
index 1a75aac..c8fab72 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.client.cli;
 
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.Flip6StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FlinkException;
 
@@ -33,7 +33,7 @@ import javax.annotation.Nullable;
 /**
  * The default CLI which is used for interaction with standalone clusters.
  */
-public class Flip6DefaultCLI extends AbstractCustomCommandLine {
+public class Flip6DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
 
 	public static final Option FLIP_6 = new Option("flip6", "Switches the client to Flip-6 mode.");
 
@@ -62,7 +62,7 @@ public class Flip6DefaultCLI extends AbstractCustomCommandLine {
 	}
 
 	@Override
-	public ClusterDescriptor createClusterDescriptor(
+	public Flip6StandaloneClusterDescriptor createClusterDescriptor(
 			CommandLine commandLine) throws FlinkException {
 		final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
 
@@ -71,8 +71,8 @@ public class Flip6DefaultCLI extends AbstractCustomCommandLine {
 
 	@Override
 	@Nullable
-	public String getClusterId(CommandLine commandLine) {
-		return "flip6Standalone";
+	public StandaloneClusterId getClusterId(CommandLine commandLine) {
+		return StandaloneClusterId.getInstance();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index b1f566c..aadb5fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -24,8 +24,10 @@ import org.apache.flink.util.FlinkException;
 
 /**
  * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
+ *
+ * @param <T> Type of the cluster id
  */
-public interface ClusterDescriptor extends AutoCloseable {
+public interface ClusterDescriptor<T> extends AutoCloseable {
 
 	/**
 	 * Returns a String containing details about the cluster (NodeManagers, available memory, ...).
@@ -35,19 +37,19 @@ public interface ClusterDescriptor extends AutoCloseable {
 
 	/**
 	 * Retrieves an existing Flink Cluster.
-	 * @param applicationID The unique application identifier of the running cluster
+	 * @param clusterId The unique identifier of the running cluster
 	 * @return Client for the cluster
-	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
+	 * @throws ClusterRetrieveException if the cluster client could not be retrieved
 	 */
-	ClusterClient retrieve(String applicationID) throws UnsupportedOperationException;
+	ClusterClient<T> retrieve(T clusterId) throws ClusterRetrieveException;
 
 	/**
 	 * Triggers deployment of a cluster.
 	 * @param clusterSpecification Cluster specification defining the cluster to deploy
 	 * @return Client for the cluster
-	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
+	 * @throws ClusterDeploymentException if the cluster could not be deployed
 	 */
-	ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException;
+	ClusterClient<T> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException;
 
 	/**
 	 * Deploys a per-job cluster with the given job on the cluster.
@@ -57,9 +59,9 @@ public interface ClusterDescriptor extends AutoCloseable {
 	 * @return Cluster client to talk to the Flink cluster
 	 * @throws ClusterDeploymentException if the cluster could not be deployed
 	 */
-	ClusterClient deployJobCluster(
+	ClusterClient<T> deployJobCluster(
 		final ClusterSpecification clusterSpecification,
-		final JobGraph jobGraph);
+		final JobGraph jobGraph) throws ClusterDeploymentException;
 
 	/**
 	 * Terminates the cluster with the given cluster id.
@@ -67,5 +69,5 @@ public interface ClusterDescriptor extends AutoCloseable {
 	 * @param clusterId identifying the cluster to shut down
 	 * @throws FlinkException if the cluster could not be terminated
 	 */
-	void terminateCluster(String clusterId) throws FlinkException;
+	void terminateCluster(T clusterId) throws FlinkException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterRetrieveException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterRetrieveException.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterRetrieveException.java
new file mode 100644
index 0000000..7234d1b
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterRetrieveException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which indicates that a cluster could not be retrieved.
+ */
+public class ClusterRetrieveException extends FlinkException {
+
+	private static final long serialVersionUID = 7718062507419172318L;
+
+	public ClusterRetrieveException(String message) {
+		super(message);
+	}
+
+	public ClusterRetrieveException(Throwable cause) {
+		super(cause);
+	}
+
+	public ClusterRetrieveException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
index 70fd9f7..8096da8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Preconditions;
 /**
  * A deployment descriptor for an existing cluster.
  */
-public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor {
+public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
 
 	private final Configuration config;
 
@@ -44,27 +44,27 @@ public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor {
 	}
 
 	@Override
-	public RestClusterClient retrieve(String applicationID) {
+	public RestClusterClient<StandaloneClusterId> retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
 		try {
-			return new RestClusterClient(config);
+			return new RestClusterClient<>(config, standaloneClusterId);
 		} catch (Exception e) {
-			throw new RuntimeException("Couldn't retrieve FLIP-6 standalone cluster", e);
+			throw new ClusterRetrieveException("Couldn't retrieve FLIP-6 standalone cluster", e);
 		}
 	}
 
 	@Override
-	public RestClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
+	public RestClusterClient<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) {
 		throw new UnsupportedOperationException("Can't deploy a FLIP-6 standalone cluster.");
 	}
 
 	@Override
-	public RestClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
+	public RestClusterClient<StandaloneClusterId> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
 		throw new UnsupportedOperationException("Can't deploy a standalone FLIP-6 per-job cluster.");
 	}
 
 	@Override
-	public void terminateCluster(String clusterId) throws FlinkException {
-		throw new UnsupportedOperationException("Cannot terminate a standalone Flip-6 cluster.");
+	public void terminateCluster(StandaloneClusterId clusterId) throws FlinkException {
+		throw new UnsupportedOperationException("Cannot terminate a Flip-6 standalone cluster.");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 5638232..62908fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.FlinkException;
 /**
  * A deployment descriptor for an existing cluster.
  */
-public class StandaloneClusterDescriptor implements ClusterDescriptor {
+public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
 
 	private final Configuration config;
 
@@ -43,16 +43,16 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor {
 	}
 
 	@Override
-	public StandaloneClusterClient retrieve(String applicationID) {
+	public StandaloneClusterClient retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
 		try {
 			return new StandaloneClusterClient(config);
 		} catch (Exception e) {
-			throw new RuntimeException("Couldn't retrieve standalone cluster", e);
+			throw new ClusterRetrieveException("Couldn't retrieve standalone cluster", e);
 		}
 	}
 
 	@Override
-	public StandaloneClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
+	public StandaloneClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) {
 		throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
 	}
 
@@ -62,8 +62,8 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor {
 	}
 
 	@Override
-	public void terminateCluster(String clusterId) throws FlinkException {
-		throw new UnsupportedOperationException("Cannot terminate a standalone cluster.");
+	public void terminateCluster(StandaloneClusterId clusterId) throws FlinkException {
+		throw new UnsupportedOperationException("Cannot terminate standalone clusters.");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java
new file mode 100644
index 0000000..7201f1d
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+/**
+ * Identifier for standalone clusters.
+ */
+public class StandaloneClusterId {
+	private static final StandaloneClusterId INSTANCE = new StandaloneClusterId();
+
+	private StandaloneClusterId() {}
+
+	public static StandaloneClusterId getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index efa23fb..b992e60 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -93,8 +93,10 @@ import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Encapsulates the functionality necessary to submit a program to a remote cluster.
+ *
+ * @param <T> type of the cluster id
  */
-public abstract class ClusterClient {
+public abstract class ClusterClient<T> {
 
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -937,9 +939,11 @@ public abstract class ClusterClient {
 	public abstract List<String> getNewMessages();
 
 	/**
-	 * Returns a string representation of the cluster.
+	 * Returns the cluster id identifying the cluster to which the client is connected.
+	 *
+	 * @return cluster id of the connected cluster
 	 */
-	public abstract String getClusterIdentifier();
+	public abstract T getClusterId();
 
 	/**
 	 * Set the mode of this client (detached or blocking job execution).

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 7e47825..13f49d3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -35,7 +35,7 @@ import java.util.List;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-	protected final ClusterClient client;
+	protected final ClusterClient<?> client;
 
 	protected final List<URL> jarFilesToAttach;
 
@@ -45,7 +45,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 
 	protected final SavepointRestoreSettings savepointSettings;
 
-	public ContextEnvironment(ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths,
+	public ContextEnvironment(ClusterClient<?> remoteConnection, List<URL> jarFiles, List<URL> classpaths,
 				ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
@@ -84,7 +84,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 				+ ") : " + getIdString();
 	}
 
-	public ClusterClient getClient() {
+	public ClusterClient<?> getClient() {
 		return this.client;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index 6209254..64b1863 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -33,7 +33,7 @@ import java.util.List;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final ClusterClient client;
+	private final ClusterClient<?> client;
 
 	private final List<URL> jarFilesToAttach;
 
@@ -49,7 +49,7 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
 	private SavepointRestoreSettings savepointSettings;
 
-	public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach,
+	public ContextEnvironmentFactory(ClusterClient<?> client, List<URL> jarFilesToAttach,
 			List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
 			boolean isDetached, SavepointRestoreSettings savepointSettings) {
 		this.client = client;

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
index 63aa811..9c7b639 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -44,7 +44,7 @@ public class DetachedEnvironment extends ContextEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class);
 
 	public DetachedEnvironment(
-			ClusterClient remoteConnection,
+			ClusterClient<?> remoteConnection,
 			List<URL> jarFiles,
 			List<URL> classpaths,
 			ClassLoader userCodeClassLoader,

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 1782a25..df08c30 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
@@ -38,7 +39,7 @@ import scala.concurrent.Future;
  * Cluster client for communication with an standalone (on-premise) cluster or an existing cluster that has been
  * brought up independently of a specific job.
  */
-public class StandaloneClusterClient extends ClusterClient {
+public class StandaloneClusterClient extends ClusterClient<StandaloneClusterId> {
 
 	public StandaloneClusterClient(Configuration config) throws Exception {
 		super(config);
@@ -81,9 +82,8 @@ public class StandaloneClusterClient extends ClusterClient {
 	}
 
 	@Override
-	public String getClusterIdentifier() {
-		// Avoid blocking here by getting the address from the config without resolving the address
-		return "Standalone cluster with JobManager at " + this.getJobManagerAddress();
+	public StandaloneClusterId getClusterId() {
+		return StandaloneClusterId.getInstance();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a6bff1a..3e8b136 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
 
 import javax.annotation.Nullable;
@@ -83,23 +84,31 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * A {@link ClusterClient} implementation that communicates via HTTP REST requests.
  */
-public class RestClusterClient extends ClusterClient {
+public class RestClusterClient<T> extends ClusterClient<T> {
 
 	private final RestClusterClientConfiguration restClusterClientConfiguration;
+
 	private final RestClient restClient;
+
 	private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
 	private final WaitStrategy waitStrategy;
 
-	public RestClusterClient(Configuration config) throws Exception {
-		this(config, new ExponentialWaitStrategy(10, 2000));
+	private final T clusterId;
+
+	public RestClusterClient(Configuration config, T clusterId) throws Exception {
+		this(
+			config,
+			clusterId,
+			new ExponentialWaitStrategy(10L, 2000L));
 	}
 
 	@VisibleForTesting
-	RestClusterClient(Configuration configuration, WaitStrategy waitStrategy) throws Exception {
+	RestClusterClient(Configuration configuration, T clusterId, WaitStrategy waitStrategy) throws Exception {
 		super(configuration);
 		this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
 		this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
 		this.waitStrategy = requireNonNull(waitStrategy);
+		this.clusterId = Preconditions.checkNotNull(clusterId);
 	}
 
 	@Override
@@ -295,16 +304,16 @@ public class RestClusterClient extends ClusterClient {
 			});
 	}
 
+	@Override
+	public T getClusterId() {
+		return clusterId;
+	}
+
 	// ======================================
 	// Legacy stuff we actually implement
 	// ======================================
 
 	@Override
-	public String getClusterIdentifier() {
-		return "Flip-6 Standalone cluster with dispatcher at " + restClusterClientConfiguration.getRestServerAddress() + '.';
-	}
-
-	@Override
 	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
 		return false;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java
index 60bd308..b2fa003 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java
@@ -52,7 +52,7 @@ public class CliFrontendCancelTest extends TestLogger {
 		JobID jid = new JobID();
 
 		String[] parameters = { jid.toString() };
-		final ClusterClient clusterClient = createClusterClient();
+		final ClusterClient<String> clusterClient = createClusterClient();
 		MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
 
 		testFrontend.cancel(parameters);
@@ -90,7 +90,7 @@ public class CliFrontendCancelTest extends TestLogger {
 			JobID jid = new JobID();
 
 			String[] parameters = { "-s", jid.toString() };
-			final ClusterClient clusterClient = createClusterClient();
+			final ClusterClient<String> clusterClient = createClusterClient();
 			MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
 			testFrontend.cancel(parameters);
 
@@ -103,7 +103,7 @@ public class CliFrontendCancelTest extends TestLogger {
 			JobID jid = new JobID();
 
 			String[] parameters = { "-s", "targetDirectory", jid.toString() };
-			final ClusterClient clusterClient = createClusterClient();
+			final ClusterClient<String> clusterClient = createClusterClient();
 			MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
 			testFrontend.cancel(parameters);
 
@@ -134,8 +134,8 @@ public class CliFrontendCancelTest extends TestLogger {
 		testFrontend.cancel(parameters);
 	}
 
-	private static ClusterClient createClusterClient() throws Exception {
-		final ClusterClient clusterClient = mock(ClusterClient.class);
+	private static ClusterClient<String> createClusterClient() throws Exception {
+		final ClusterClient<String> clusterClient = mock(ClusterClient.class);
 
 		return clusterClient;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java
index 77d8016..760b376 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java
@@ -49,7 +49,7 @@ public class CliFrontendListTest extends TestLogger {
 		// test list properly
 		{
 			String[] parameters = {"-r", "-s"};
-			ClusterClient clusterClient = createClusterClient();
+			ClusterClient<String> clusterClient = createClusterClient();
 			MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
 			testFrontend.list(parameters);
 			Mockito.verify(clusterClient, times(1))
@@ -67,8 +67,8 @@ public class CliFrontendListTest extends TestLogger {
 		testFrontend.list(parameters);
 	}
 
-	private static ClusterClient createClusterClient() throws Exception {
-		final ClusterClient clusterClient = mock(ClusterClient.class);
+	private static ClusterClient<String> createClusterClient() throws Exception {
+		final ClusterClient<String> clusterClient = mock(ClusterClient.class);
 
 		when(clusterClient.listJobs()).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
index 7b31a3e..d730344 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
@@ -82,7 +82,7 @@ public class CliFrontendSavepointTest extends TestLogger {
 
 		String savepointPath = "expectedSavepointPath";
 
-		final ClusterClient clusterClient = createClusterClient(savepointPath);
+		final ClusterClient<String> clusterClient = createClusterClient(savepointPath);
 
 		try {
 			MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
@@ -110,7 +110,7 @@ public class CliFrontendSavepointTest extends TestLogger {
 		String expectedTestException = "expectedTestException";
 		Exception testException = new Exception(expectedTestException);
 
-		final ClusterClient clusterClient = createFailingClusterClient(testException);
+		final ClusterClient<String> clusterClient = createFailingClusterClient(testException);
 
 		try {
 			MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
@@ -165,7 +165,7 @@ public class CliFrontendSavepointTest extends TestLogger {
 
 		String savepointDirectory = "customTargetDirectory";
 
-		final ClusterClient clusterClient = createClusterClient(savepointDirectory);
+		final ClusterClient<String> clusterClient = createClusterClient(savepointDirectory);
 
 		try {
 			MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
@@ -224,7 +224,7 @@ public class CliFrontendSavepointTest extends TestLogger {
 
 		final CompletableFuture<String> disposeSavepointFuture = new CompletableFuture<>();
 
-		final ClusterClient clusterClient = new DisposeSavepointClusterClient(
+		final DisposeSavepointClusterClient clusterClient = new DisposeSavepointClusterClient(
 			(String savepointPath, Time timeout) -> {
 				disposeSavepointFuture.complete(savepointPath);
 				return CompletableFuture.completedFuture(Acknowledge.get());
@@ -260,7 +260,7 @@ public class CliFrontendSavepointTest extends TestLogger {
 
 		Exception testException = new Exception("expectedTestException");
 
-		ClusterClient clusterClient = new DisposeSavepointClusterClient((String path, Time timeout) -> FutureUtils.completedExceptionally(testException));
+		DisposeSavepointClusterClient clusterClient = new DisposeSavepointClusterClient((String path, Time timeout) -> FutureUtils.completedExceptionally(testException));
 
 		try {
 			CliFrontend frontend = new MockedCliFrontend(clusterClient);
@@ -313,8 +313,8 @@ public class CliFrontendSavepointTest extends TestLogger {
 		System.setErr(stdErr);
 	}
 
-	private static ClusterClient createClusterClient(String expectedResponse) throws Exception {
-		final ClusterClient clusterClient = mock(ClusterClient.class);
+	private static ClusterClient<String> createClusterClient(String expectedResponse) throws Exception {
+		final ClusterClient<String> clusterClient = mock(ClusterClient.class);
 
 		when(clusterClient.triggerSavepoint(any(JobID.class), anyString()))
 			.thenReturn(CompletableFuture.completedFuture(expectedResponse));
@@ -322,8 +322,8 @@ public class CliFrontendSavepointTest extends TestLogger {
 		return clusterClient;
 	}
 
-	private static ClusterClient createFailingClusterClient(Exception expectedException) throws Exception {
-		final ClusterClient clusterClient = mock(ClusterClient.class);
+	private static ClusterClient<String> createFailingClusterClient(Exception expectedException) throws Exception {
+		final ClusterClient<String> clusterClient = mock(ClusterClient.class);
 
 		when(clusterClient.triggerSavepoint(any(JobID.class), anyString()))
 			.thenReturn(FutureUtils.completedExceptionally(expectedException));

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java
index 0120cdf..d6049e5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.util.MockedCliFrontend;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -29,9 +30,12 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 
 import static org.apache.flink.client.cli.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -51,18 +55,16 @@ public class CliFrontendStopTest extends TestLogger {
 	@Test
 	public void testStop() throws Exception {
 		// test stop properly
-		{
-			JobID jid = new JobID();
-			String jidString = jid.toString();
+		JobID jid = new JobID();
+		String jidString = jid.toString();
 
-			String[] parameters = { jidString };
-			final ClusterClient clusterClient = createClusterClient(false);
-			MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
+		String[] parameters = { jidString };
+		final ClusterClient<String> clusterClient = createClusterClient(null);
+		MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
 
-			testFrontend.stop(parameters);
+		testFrontend.stop(parameters);
 
-			Mockito.verify(clusterClient, times(1)).stop(any(JobID.class));
-		}
+		Mockito.verify(clusterClient, times(1)).stop(any(JobID.class));
 	}
 
 	@Test(expected = CliArgsException.class)
@@ -87,32 +89,30 @@ public class CliFrontendStopTest extends TestLogger {
 		testFrontend.stop(parameters);
 	}
 
-	@Test(expected = TestException.class)
+	@Test
 	public void testUnknownJobId() throws Exception {
 		// test unknown job Id
 		JobID jid = new JobID();
 
 		String[] parameters = { jid.toString() };
-		final ClusterClient clusterClient = createClusterClient(true);
+		String expectedMessage = "Test exception";
+		FlinkException testException = new FlinkException(expectedMessage);
+		final ClusterClient<String> clusterClient = createClusterClient(testException);
 		MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
 
-		testFrontend.stop(parameters);
-		fail("Should have failed.");
-	}
-
-	private static final class TestException extends FlinkException {
-		private static final long serialVersionUID = -2650760898729937583L;
-
-		TestException(String message) {
-			super(message);
+		try {
+			testFrontend.stop(parameters);
+			fail("Should have failed.");
+		} catch (FlinkException e) {
+			assertTrue(ExceptionUtils.findThrowableWithMessage(e, expectedMessage).isPresent());
 		}
 	}
 
-	private static ClusterClient createClusterClient(boolean reject) throws Exception {
-		final ClusterClient clusterClient = mock(ClusterClient.class);
+	private static ClusterClient<String> createClusterClient(@Nullable Exception exception) throws Exception {
+		final ClusterClient<String> clusterClient = mock(ClusterClient.class);
 
-		if (reject) {
-				doThrow(new TestException("Test Exception")).when(clusterClient).stop(any(JobID.class));
+		if (exception != null) {
+			doThrow(exception).when(clusterClient).stop(any(JobID.class));
 		}
 
 		return clusterClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index 6eb005d..aaca798 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.client.cli;
 
-import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -62,9 +62,9 @@ public class DefaultCLITest extends TestLogger {
 
 		final InetSocketAddress expectedAddress = new InetSocketAddress(localhost, port);
 
-		final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+		final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
 
-		final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+		final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
 
 		Assert.assertEquals(expectedAddress, clusterClient.getJobManagerAddress());
 	}
@@ -89,9 +89,9 @@ public class DefaultCLITest extends TestLogger {
 
 		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
 
-		final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+		final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
 
-		final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+		final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
 
 		final InetSocketAddress expectedAddress = new InetSocketAddress(manualHostname, manualPort);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index 2bb3ed0..0ceb95e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -28,11 +28,11 @@ import org.apache.flink.util.Preconditions;
 /**
  * Dummy {@link ClusterDescriptor} implementation for testing purposes.
  */
-public class DummyClusterDescriptor implements ClusterDescriptor {
+public class DummyClusterDescriptor<T> implements ClusterDescriptor<T> {
 
-	private final ClusterClient clusterClient;
+	private final ClusterClient<T> clusterClient;
 
-	public DummyClusterDescriptor(ClusterClient clusterClient) {
+	public DummyClusterDescriptor(ClusterClient<T> clusterClient) {
 		this.clusterClient = Preconditions.checkNotNull(clusterClient);
 	}
 
@@ -42,23 +42,23 @@ public class DummyClusterDescriptor implements ClusterDescriptor {
 	}
 
 	@Override
-	public ClusterClient retrieve(String applicationID) throws UnsupportedOperationException {
+	public ClusterClient<T> retrieve(T clusterId) {
 		return clusterClient;
 	}
 
 	@Override
-	public ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
+	public ClusterClient<T> deploySessionCluster(ClusterSpecification clusterSpecification) {
 		return clusterClient;
 	}
 
 	@Override
-	public ClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
+	public ClusterClient<T> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
 		return clusterClient;
 	}
 
 	@Override
-	public void terminateCluster(String clusterId) throws FlinkException {
-		throw new UnsupportedOperationException("DummyClusterDescriptor does not support cluster termination.");
+	public void terminateCluster(T clusterId) throws FlinkException {
+		throw new UnsupportedOperationException("Cannot terminate a dummy cluster.");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
index 5279d85..12bea74 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
@@ -32,10 +32,10 @@ import javax.annotation.Nullable;
 /**
  * Dummy implementation of the {@link CustomCommandLine} for testing purposes.
  */
-public class DummyCustomCommandLine implements CustomCommandLine {
-	private final ClusterClient clusterClient;
+public class DummyCustomCommandLine<T> implements CustomCommandLine {
+	private final ClusterClient<T> clusterClient;
 
-	public DummyCustomCommandLine(ClusterClient clusterClient) {
+	public DummyCustomCommandLine(ClusterClient<T> clusterClient) {
 		this.clusterClient = Preconditions.checkNotNull(clusterClient);
 	}
 
@@ -60,8 +60,8 @@ public class DummyCustomCommandLine implements CustomCommandLine {
 	}
 
 	@Override
-	public ClusterDescriptor createClusterDescriptor(CommandLine commandLine) {
-		return new DummyClusterDescriptor(clusterClient);
+	public ClusterDescriptor<T> createClusterDescriptor(CommandLine commandLine) {
+		return new DummyClusterDescriptor<>(clusterClient);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index c2505ae..72767c7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -101,7 +101,7 @@ public class ClientConnectionTest extends TestLogger {
 		config.setString(JobManagerOptions.ADDRESS, unreachableEndpoint.getHostName());
 		config.setInteger(JobManagerOptions.PORT, unreachableEndpoint.getPort());
 
-		ClusterClient client = new StandaloneClusterClient(config);
+		StandaloneClusterClient client = new StandaloneClusterClient(config);
 
 		try {
 			// we have to query the cluster status to start the connection attempts
@@ -140,7 +140,7 @@ public class ClientConnectionTest extends TestLogger {
 
 			highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService);
 
-			ClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices);
+			StandaloneClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices);
 
 			ActorGateway gateway = client.getJobManagerGateway();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 99f51ad..ca8b6fb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -131,7 +131,7 @@ public class ClientTest extends TestLogger {
 		jobManagerSystem.actorOf(
 			Props.create(SuccessReturningActor.class),
 			JobMaster.JOB_MANAGER_NAME);
-		ClusterClient out = new StandaloneClusterClient(config);
+		StandaloneClusterClient out = new StandaloneClusterClient(config);
 		out.setDetached(true);
 
 		try {
@@ -204,7 +204,7 @@ public class ClientTest extends TestLogger {
 			Props.create(SuccessReturningActor.class),
 			JobMaster.JOB_MANAGER_NAME);
 
-		ClusterClient out = new StandaloneClusterClient(config);
+		StandaloneClusterClient out = new StandaloneClusterClient(config);
 		out.setDetached(true);
 		JobSubmissionResult result = out.run(program.getPlanWithJars(), 1);
 
@@ -222,7 +222,7 @@ public class ClientTest extends TestLogger {
 				Props.create(FailureReturningActor.class),
 				JobMaster.JOB_MANAGER_NAME);
 
-		ClusterClient out = new StandaloneClusterClient(config);
+		StandaloneClusterClient out = new StandaloneClusterClient(config);
 		out.setDetached(true);
 
 		try {
@@ -259,7 +259,7 @@ public class ClientTest extends TestLogger {
 			}).when(packagedProgramMock).invokeInteractiveModeForExecution();
 
 			try {
-				ClusterClient client = new StandaloneClusterClient(config);
+				StandaloneClusterClient client = new StandaloneClusterClient(config);
 				client.setDetached(true);
 				client.run(packagedProgramMock, 1);
 				fail("Creating the local execution environment should not be possible");

http://git-wip-us.apache.org/repos/asf/flink/blob/38d37208/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 7b34d4a..e2eb88d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -71,7 +71,7 @@ public class ClusterClientTest extends TestLogger {
 		Configuration config = new Configuration();
 		HighAvailabilityServices highAvailabilityServices = mock(HighAvailabilityServices.class);
 
-		ClusterClient clusterClient = new StandaloneClusterClient(config, highAvailabilityServices);
+		StandaloneClusterClient clusterClient = new StandaloneClusterClient(config, highAvailabilityServices);
 
 		clusterClient.shutdown();
 
@@ -87,7 +87,7 @@ public class ClusterClientTest extends TestLogger {
 
 		JobID jobID = new JobID();
 		TestStopActorGateway gateway = new TestStopActorGateway(jobID);
-		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		TestClusterClient clusterClient = new TestClusterClient(config, gateway);
 		try {
 			clusterClient.stop(jobID);
 			Assert.assertTrue(gateway.messageArrived);
@@ -103,7 +103,7 @@ public class ClusterClientTest extends TestLogger {
 
 		JobID jobID = new JobID();
 		TestCancelActorGateway gateway = new TestCancelActorGateway(jobID);
-		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		TestClusterClient clusterClient = new TestClusterClient(config, gateway);
 		try {
 			clusterClient.cancel(jobID);
 			Assert.assertTrue(gateway.messageArrived);
@@ -121,7 +121,7 @@ public class ClusterClientTest extends TestLogger {
 		String savepointDirectory = "/test/directory";
 		String savepointPath = "/test/path";
 		TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointDirectory, savepointPath);
-		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		TestClusterClient clusterClient = new TestClusterClient(config, gateway);
 		try {
 			String path = clusterClient.cancelWithSavepoint(jobID, savepointDirectory);
 			Assert.assertTrue(gateway.messageArrived);
@@ -140,7 +140,7 @@ public class ClusterClientTest extends TestLogger {
 		String savepointDirectory = "/test/directory";
 		String savepointPath = "/test/path";
 		TestSavepointActorGateway gateway = new TestSavepointActorGateway(jobID, savepointDirectory, savepointPath);
-		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		TestClusterClient clusterClient = new TestClusterClient(config, gateway);
 		try {
 			CompletableFuture<String> pathFuture = clusterClient.triggerSavepoint(jobID, savepointDirectory);
 			Assert.assertTrue(gateway.messageArrived);
@@ -156,7 +156,7 @@ public class ClusterClientTest extends TestLogger {
 		config.setString(JobManagerOptions.ADDRESS, "localhost");
 
 		TestListActorGateway gateway = new TestListActorGateway();
-		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		TestClusterClient clusterClient = new TestClusterClient(config, gateway);
 		try {
 			CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
 			Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
@@ -179,7 +179,7 @@ public class ClusterClientTest extends TestLogger {
 
 		final ActorGateway jobManagerGateway = new TestDisposeWithWrongResponseActorGateway();
 
-		final ClusterClient clusterClient = new TestClusterClient(configuration, jobManagerGateway);
+		final TestClusterClient clusterClient = new TestClusterClient(configuration, jobManagerGateway);
 
 		CompletableFuture<Acknowledge> acknowledgeCompletableFuture = clusterClient.disposeSavepoint(savepointPath, timeout);
 
@@ -201,7 +201,7 @@ public class ClusterClientTest extends TestLogger {
 
 		final ActorGateway jobManagerGateway = new TestDisposeWithClassNotFoundExceptionActorGateway();
 
-		final ClusterClient clusterClient = new TestClusterClient(configuration, jobManagerGateway);
+		final TestClusterClient clusterClient = new TestClusterClient(configuration, jobManagerGateway);
 
 		CompletableFuture<Acknowledge> acknowledgeCompletableFuture = clusterClient.disposeSavepoint(savepointPath, timeout);
 


Mime
View raw message