flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [3/3] flink git commit: [FLINK-6719] Activate strict checkstyle for flink-clients
Date Fri, 26 May 2017 06:06:47 GMT
[FLINK-6719] Activate strict checkstyle for flink-clients

This closes #3989.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c793ea41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c793ea41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c793ea41

Branch: refs/heads/master
Commit: c793ea41d88fe84fa97d825728ad95f35e27ef82
Parents: db397b8
Author: zentol <chesnay@apache.org>
Authored: Thu May 25 13:02:36 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri May 26 08:03:14 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  52 ++++---
 .../org/apache/flink/client/ClientUtils.java    |   3 +-
 .../org/apache/flink/client/LocalExecutor.java  |  47 ++++---
 .../org/apache/flink/client/RemoteExecutor.java |  23 ++--
 .../apache/flink/client/cli/CancelOptions.java  |   3 +-
 .../flink/client/cli/CliArgsException.java      |   3 +-
 .../flink/client/cli/CliFrontendParser.java     |  12 +-
 .../flink/client/cli/CommandLineOptions.java    |   3 +-
 .../flink/client/cli/CustomCommandLine.java     |  15 ++-
 .../org/apache/flink/client/cli/DefaultCLI.java |   7 +-
 .../apache/flink/client/cli/InfoOptions.java    |   3 +-
 .../apache/flink/client/cli/ListOptions.java    |   5 +-
 .../apache/flink/client/cli/ProgramOptions.java |   4 +-
 .../org/apache/flink/client/cli/RunOptions.java |   3 +-
 .../flink/client/cli/SavepointOptions.java      |   3 +-
 .../apache/flink/client/cli/StopOptions.java    |   3 +-
 .../client/deployment/ClusterDescriptor.java    |   5 +-
 .../deployment/StandaloneClusterDescriptor.java |   4 +-
 .../flink/client/program/ClusterClient.java     |  88 ++++++------
 .../client/program/ContextEnvironment.java      |  12 +-
 .../program/ContextEnvironmentFactory.java      |   9 +-
 .../client/program/DetachedEnvironment.java     |   4 +
 .../flink/client/program/JobWithJars.java       |  25 ++--
 .../program/OptimizerPlanEnvironment.java       |  15 ++-
 .../flink/client/program/PackagedProgram.java   | 135 +++++++++----------
 .../client/program/PreviewPlanEnvironment.java  |   5 +-
 .../program/ProgramInvocationException.java     |   7 +-
 .../client/program/StandaloneClusterClient.java |   7 +-
 .../CliFrontendAddressConfigurationTest.java    |   2 +-
 .../flink/client/CliFrontendInfoTest.java       |  14 +-
 .../flink/client/CliFrontendListCancelTest.java |  31 +++--
 .../client/CliFrontendPackageProgramTest.java   |  52 +++----
 .../apache/flink/client/CliFrontendRunTest.java |  12 +-
 .../flink/client/CliFrontendSavepointTest.java  |  15 ++-
 .../flink/client/CliFrontendStopTest.java       |  20 ++-
 .../flink/client/CliFrontendTestUtils.java      |  33 +++--
 .../RemoteExecutorHostnameResolutionTest.java   |   6 +-
 .../client/program/ClientConnectionTest.java    |  13 +-
 .../apache/flink/client/program/ClientTest.java |  72 ++++++----
 .../flink/client/program/ClusterClientTest.java |   6 +-
 .../ExecutionPlanAfterExecutionTest.java        |  25 ++--
 .../program/ExecutionPlanCreationTest.java      |  44 +++---
 ...rRetrievalServiceHostnameResolutionTest.java |   5 +-
 .../client/program/PackagedProgramTest.java     |  15 ++-
 .../testjar/JobWithExternalDependency.java      |   5 +-
 .../apache/flink/client/testjar/WordCount.java  |  55 ++++----
 46 files changed, 503 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 62fa402..a22cb37 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client;
 
-import org.apache.commons.cli.CommandLine;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -74,12 +73,10 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
+
+import org.apache.commons.cli.CommandLine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -100,6 +97,11 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
 import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
@@ -139,8 +141,6 @@ public class CliFrontend {
 
 	// --------------------------------------------------------------------------------------------
 
-
-
 	private final Configuration config;
 
 	private final FiniteDuration clientTimeout;
@@ -173,13 +173,12 @@ public class CliFrontend {
 		this.clientTimeout = AkkaUtils.getClientTimeout(config);
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 	//  Getter & Setter
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Getter which returns a copy of the associated configuration
+	 * Getter which returns a copy of the associated configuration.
 	 *
 	 * @return Copy of the associated configuration
 	 */
@@ -191,14 +190,13 @@ public class CliFrontend {
 		return copiedConfiguration;
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 	//  Execute Actions
 	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * Executions the run action.
-	 * 
+	 *
 	 * @param args Command line arguments for the run action.
 	 */
 	protected int run(String[] args) {
@@ -251,7 +249,7 @@ public class CliFrontend {
 			LOG.debug("User parallelism is set to {}", userParallelism);
 			if (client.getMaxSlots() != -1 && userParallelism == -1) {
 				logAndSysout("Using the parallelism provided by the remote cluster ("
-					+ client.getMaxSlots()+"). "
+					+ client.getMaxSlots() + "). "
 					+ "To use another parallelism, set it at the ./bin/flink client.");
 				userParallelism = client.getMaxSlots();
 			}
@@ -277,7 +275,7 @@ public class CliFrontend {
 
 	/**
 	 * Executes the info action.
-	 * 
+	 *
 	 * @param args Command line arguments for the info action.
 	 */
 	protected int info(String[] args) {
@@ -323,7 +321,7 @@ public class CliFrontend {
 
 			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
 			FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism);
-			
+
 			String jsonPlan = null;
 			if (flinkPlan instanceof OptimizedPlan) {
 				jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
@@ -361,7 +359,7 @@ public class CliFrontend {
 
 	/**
 	 * Executes the list action.
-	 * 
+	 *
 	 * @param args Command line arguments for the list action.
 	 */
 	protected int list(String[] args) {
@@ -437,12 +435,12 @@ public class CliFrontend {
 				Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){
 					@Override
 					public int compare(JobStatusMessage o1, JobStatusMessage o2) {
-						return (int)(o1.getStartTime()-o2.getStartTime());
+						return (int) (o1.getStartTime() - o2.getStartTime());
 					}
 				};
 
 				if (running) {
-					if(runningJobs.size() == 0) {
+					if (runningJobs.size() == 0) {
 						System.out.println("No running jobs.");
 					}
 					else {
@@ -464,7 +462,7 @@ public class CliFrontend {
 						Collections.sort(scheduledJobs, njec);
 
 						System.out.println("----------------------- Scheduled Jobs -----------------------");
-						for(JobStatusMessage rj : scheduledJobs) {
+						for (JobStatusMessage rj : scheduledJobs) {
 							System.out.println(df.format(new Date(rj.getStartTime()))
 									+ " : " + rj.getJobId() + " : " + rj.getJobName());
 						}
@@ -485,7 +483,7 @@ public class CliFrontend {
 
 	/**
 	 * Executes the STOP action.
-	 * 
+	 *
 	 * @param args Command line arguments for the stop action.
 	 */
 	protected int stop(String[] args) {
@@ -544,7 +542,7 @@ public class CliFrontend {
 
 	/**
 	 * Executes the CANCEL action.
-	 * 
+	 *
 	 * @param args Command line arguments for the cancel action.
 	 */
 	protected int cancel(String[] args) {
@@ -877,8 +875,7 @@ public class CliFrontend {
 	 * @throws org.apache.flink.client.program.ProgramInvocationException
 	 */
 	protected PackagedProgram buildProgram(ProgramOptions options)
-			throws FileNotFoundException, ProgramInvocationException
-	{
+			throws FileNotFoundException, ProgramInvocationException {
 		String[] programArgs = options.getProgramArgs();
 		String jarFilePath = options.getJarFilePath();
 		List<URL> classpaths = options.getClasspaths();
@@ -910,7 +907,7 @@ public class CliFrontend {
 	}
 
 	/**
-	 * Updates the associated configuration with the given command line options
+	 * Updates the associated configuration with the given command line options.
 	 *
 	 * @param options Command line options
 	 */
@@ -1023,7 +1020,7 @@ public class CliFrontend {
 
 	/**
 	 * Displays an exception message.
-	 * 
+	 *
 	 * @param t The exception to display.
 	 * @return The return code for the process.
 	 */
@@ -1061,7 +1058,7 @@ public class CliFrontend {
 
 	/**
 	 * Parses the command line arguments and starts the requested action.
-	 * 
+	 *
 	 * @param args command line arguments of the client.
 	 * @return The return code of the program
 	 */
@@ -1118,7 +1115,7 @@ public class CliFrontend {
 	}
 
 	/**
-	 * Submits the job based on the arguments
+	 * Submits the job based on the arguments.
 	 */
 	public static void main(final String[] args) {
 		EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
@@ -1172,9 +1169,8 @@ public class CliFrontend {
 		return location;
 	}
 
-
 	/**
-	 * Writes the given job manager address to the associated configuration object
+	 * Writes the given job manager address to the associated configuration object.
 	 *
 	 * @param address Address to write to the configuration
 	 * @param config The config to write to

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index f1ed93e..03f2f8e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client;
 
 import java.net.InetSocketAddress;
@@ -22,7 +23,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 
 /**
- * A class that provides some utility methods
+ * A class that provides some utility methods.
  */
 public class ClientUtils {
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 20a3366..abd35fc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.client;
 
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -28,18 +26,20 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
+import java.util.List;
+
 /**
  * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
  *
@@ -50,24 +50,24 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
  * then this executor needs to be explicitly started, to keep running across several executions.</p>
  */
 public class LocalExecutor extends PlanExecutor {
-	
+
 	private static final boolean DEFAULT_OVERWRITE = false;
 
 	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
 
-	/** we lock to ensure singleton execution */
+	/** we lock to ensure singleton execution. */
 	private final Object lock = new Object();
 
-	/** The mini cluster on which to execute the local programs */
+	/** The mini cluster on which to execute the local programs. */
 	private LocalFlinkMiniCluster flink;
 
-	/** Custom user configuration for the execution */
+	/** Custom user configuration for the execution. */
 	private final Configuration configuration;
 
-	/** Config value for how many slots to provide in the local cluster */
+	/** Config value for how many slots to provide in the local cluster. */
 	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
-	/** Config flag whether to overwrite existing files by default */
+	/** Config flag whether to overwrite existing files by default. */
 	private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
 
 	// ------------------------------------------------------------------------
@@ -93,7 +93,7 @@ public class LocalExecutor extends PlanExecutor {
 	}
 
 	public void setTaskManagerNumSlots(int taskManagerNumSlots) {
-		this.taskManagerNumSlots = taskManagerNumSlots; 
+		this.taskManagerNumSlots = taskManagerNumSlots;
 	}
 
 	public int getTaskManagerNumSlots() {
@@ -119,7 +119,7 @@ public class LocalExecutor extends PlanExecutor {
 			}
 		}
 	}
-	
+
 	@Override
 	public void stop() throws Exception {
 		synchronized (lock) {
@@ -139,14 +139,14 @@ public class LocalExecutor extends PlanExecutor {
 
 	/**
 	 * Executes the given program on a local runtime and waits for the job to finish.
-	 * 
+	 *
 	 * <p>If the executor has not been started before, this starts the executor and shuts it down
 	 * after the job finished. If the job runs in session mode, the executor is kept alive until
 	 * no more references to the executor exist.</p>
-	 * 
+	 *
 	 * @param plan The plan of the program to execute.
 	 * @return The net runtime of the program, in milliseconds.
-	 * 
+	 *
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
 	 */
@@ -236,18 +236,17 @@ public class LocalExecutor extends PlanExecutor {
 		return configuration;
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 	//  Static variants that internally bring up an instance and shut it down after the execution
 	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * Executes the given program.
-	 * 
+	 *
 	 * @param pa The program.
 	 * @param args The parameters.
 	 * @return The execution result of the program.
-	 * 
+	 *
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
 	 */
@@ -257,10 +256,10 @@ public class LocalExecutor extends PlanExecutor {
 
 	/**
 	 * Executes the given dataflow plan.
-	 * 
-	 * @param plan The dataflow plan. 
+	 *
+	 * @param plan The dataflow plan.
 	 * @return The execution result.
-	 * 
+	 *
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
 	 */
@@ -270,7 +269,7 @@ public class LocalExecutor extends PlanExecutor {
 
 	/**
 	 * Creates a JSON representation of the given dataflow's execution plan.
-	 * 
+	 *
 	 * @param plan The dataflow plan.
 	 * @return The dataflow's execution plan, as a JSON string.
 	 * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
@@ -287,7 +286,7 @@ public class LocalExecutor extends PlanExecutor {
 
 	/**
 	 * Creates a JSON representation of the given dataflow plan.
-	 * 
+	 *
 	 * @param plan The dataflow plan.
 	 * @return The dataflow plan (prior to optimization) as a JSON string.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 86b36b3..4a3cc74 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.client;
 
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -30,21 +25,26 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.configuration.Configuration;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
  * and ships it to a remote Flink cluster for execution.
- * 
+ *
  * <p>The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
  * set of libraries that need to be shipped together with the program.</p>
- * 
+ *
  * <p>The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
  * remotely execute program parts.</p>
  */
@@ -62,7 +62,6 @@ public class RemoteExecutor extends PlanExecutor {
 
 	private int defaultParallelism = 1;
 
-
 	public RemoteExecutor(String hostname, int port) {
 		this(hostname, port, new Configuration(), Collections.<URL>emptyList(),
 				Collections.<URL>emptyList());
@@ -109,7 +108,6 @@ public class RemoteExecutor extends PlanExecutor {
 		this.jarFiles = jarFiles;
 		this.globalClasspaths = globalClasspaths;
 
-
 		clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
 		clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
 	}
@@ -134,7 +132,7 @@ public class RemoteExecutor extends PlanExecutor {
 	/**
 	 * Gets the parallelism that will be used when neither the program does not define
 	 * any parallelism at all.
-	 * 
+	 *
 	 * @return The default parallelism for the executor.
 	 */
 	public int getDefaultParallelism() {
@@ -145,7 +143,6 @@ public class RemoteExecutor extends PlanExecutor {
 	//  Startup & Shutdown
 	// ------------------------------------------------------------------------
 
-
 	@Override
 	public void start() throws Exception {
 		synchronized (lock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
index 54e1a23..62cede2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
@@ -22,7 +23,7 @@ import org.apache.commons.cli.CommandLine;
 import static org.apache.flink.client.cli.CliFrontendParser.CANCEL_WITH_SAVEPOINT_OPTION;
 
 /**
- * Command line options for the CANCEL command
+ * Command line options for the CANCEL command.
  */
 public class CancelOptions extends CommandLineOptions {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
index 932c66d..027be07 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
 /**
@@ -27,4 +28,4 @@ public class CliArgsException extends Exception {
 	public CliArgsException(String message) {
 		super(message);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index f1766b0..9e54ab7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -15,20 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.configuration.ConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * A simple command line parser (based on Apache Commons CLI) that extracts command
  * line options.
@@ -185,7 +186,6 @@ public class CliFrontendParser {
 		return addCustomCliOptions(options, true);
 	}
 
-
 	private static Options getJobManagerAddressOption(Options options) {
 		options.addOption(ADDRESS_OPTION);
 		return options;
@@ -374,7 +374,7 @@ public class CliFrontendParser {
 	}
 
 	/**
-	 * Adds custom cli options
+	 * Adds custom cli options.
 	 * @param options The options to add options to
 	 * @param runOptions Whether to include run options
 	 * @return Options with additions
@@ -390,7 +390,7 @@ public class CliFrontendParser {
 	}
 
 	/**
-	 * Prints custom cli options
+	 * Prints custom cli options.
 	 * @param formatter The formatter to use for printing
 	 * @param runOptions True if the run options should be printed, False to print only general options
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
index f6f6319..a9a29b2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
@@ -20,8 +20,8 @@ package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 
-import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
 
 /**
  * Base class for all options parsed from the command line.
@@ -35,7 +35,6 @@ public abstract class CommandLineOptions {
 
 	private final boolean printHelp;
 
-
 	protected CommandLineOptions(CommandLine line) {
 		this.commandLine = line;
 		this.printHelp = line.hasOption(HELP_OPTION.getOpt());

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index a4cb479..9ddaf9e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -15,24 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+
 import java.net.URL;
 import java.util.List;
 
-
 /**
  * Custom command-line interface to load hooks for the command-line interface.
  */
 public interface CustomCommandLine<ClusterType extends ClusterClient> {
 
 	/**
-	 * Signals whether the custom command-line wants to execute or not
+	 * Signals whether the custom command-line wants to execute or not.
 	 * @param commandLine The command-line options
 	 * @param configuration The Flink configuration
 	 * @return True if the command-line wants to run, False otherwise
@@ -40,7 +41,7 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
 	boolean isActive(CommandLine commandLine, Configuration configuration);
 
 	/**
-	 * Gets the unique identifier of this CustomCommandLine
+	 * Gets the unique identifier of this CustomCommandLine.
 	 * @return A unique identifier
 	 */
 	String getId();
@@ -58,7 +59,7 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
 	void addGeneralOptions(Options baseOptions);
 
 	/**
-	 * Retrieves a client for a running cluster
+	 * Retrieves a client for a running cluster.
 	 * @param commandLine The command-line parameters from the CliFrontend
 	 * @param config The Flink config
 	 * @return Client if a cluster could be retrieved
@@ -69,7 +70,7 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
 			Configuration config) throws UnsupportedOperationException;
 
 	/**
-	 * Creates the client for the cluster
+	 * Creates the client for the cluster.
 	 * @param applicationName The application name to use
 	 * @param commandLine The command-line options parsed by the CliFrontend
 	 * @param config The Flink config to use

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index e94c2f9..49e9752 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -15,10 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.client.cli;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
+package org.apache.flink.client.cli;
 
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
@@ -26,6 +24,9 @@ import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
index 83f5c38..559ce94 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 
 /**
- * Command line options for the INFO command
+ * Command line options for the INFO command.
  */
 public class InfoOptions extends ProgramOptions {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
index 45f39a4..7ae00cf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
@@ -23,7 +24,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.RUNNING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SCHEDULED_OPTION;
 
 /**
- * Command line options for the LIST command
+ * Command line options for the LIST command.
  */
 public class ListOptions extends CommandLineOptions {
 
@@ -43,4 +44,4 @@ public class ListOptions extends CommandLineOptions {
 	public boolean getScheduled() {
 		return scheduled;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 80f573e..df25e67 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -15,12 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
-import org.apache.commons.cli.CommandLine;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
+import org.apache.commons.cli.CommandLine;
+
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
index 2e4eb31..08a15d3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
@@ -27,4 +28,4 @@ public class RunOptions extends ProgramOptions {
 	public RunOptions(CommandLine line) throws CliArgsException {
 		super(line);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
index 305b0b4..1c281d6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
@@ -23,7 +24,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DISPOSE_OPTION;
 
 /**
- * Command line options for the SAVEPOINT command
+ * Command line options for the SAVEPOINT command.
  */
 public class SavepointOptions extends CommandLineOptions {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
index 7f246c8..6fb03ec 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 
 /**
- * Command line options for the STOP command
+ * Command line options for the STOP command.
  */
 public class StopOptions extends CommandLineOptions {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index 59cece3..29836a4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.deployment;
 
-
 import org.apache.flink.client.program.ClusterClient;
 
 /**
@@ -27,7 +26,7 @@ import org.apache.flink.client.program.ClusterClient;
 public interface ClusterDescriptor<ClientType extends ClusterClient> {
 
 	/**
-	 * Returns a String containing details about the cluster (NodeManagers, available memory, ...)
+	 * Returns a String containing details about the cluster (NodeManagers, available memory, ...).
 	 *
 	 */
 	String getClusterDescription();
@@ -41,7 +40,7 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> {
 	ClientType retrieve(String applicationID) throws UnsupportedOperationException;
 
 	/**
-	 * Triggers deployment of a cluster
+	 * Triggers deployment of a cluster.
 	 * @return Client for the cluster
 	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 7a3d4d4..699de3b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.deployment;
 
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
-
 /**
- * A deployment descriptor for an existing cluster
+ * A deployment descriptor for an existing cluster.
  */
 public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterClient> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index e7314eb..3018a8c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.program;
 
-import akka.actor.ActorSystem;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
@@ -59,13 +58,10 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
+import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -75,32 +71,38 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 /**
  * Encapsulates the functionality necessary to submit a program to a remote cluster.
  */
 public abstract class ClusterClient {
 
-	private final Logger LOG = LoggerFactory.getLogger(getClass());
+	private final Logger log = LoggerFactory.getLogger(getClass());
 
-	/** The optimizer used in the optimization of batch programs */
+	/** The optimizer used in the optimization of batch programs. */
 	final Optimizer compiler;
 
 	/** The actor system used to communicate with the JobManager. Lazily initialized upon first use */
 	protected final LazyActorSystemLoader actorSystemLoader;
 
-	/** Configuration of the client */
+	/** Configuration of the client. */
 	protected final Configuration flinkConfig;
 
-	/** Timeout for futures */
+	/** Timeout for futures. */
 	protected final FiniteDuration timeout;
 
-	/** Lookup timeout for the job manager retrieval service */
+	/** Lookup timeout for the job manager retrieval service. */
 	private final FiniteDuration lookupTimeout;
 
-	/** Service factory for high available */
+	/** Service factory for high available. */
 	protected final HighAvailabilityServices highAvailabilityServices;
 
-	/** Flag indicating whether to sysout print execution updates */
+	/** Flag indicating whether to sysout print execution updates. */
 	private boolean printStatusDuringExecution = true;
 
 	/**
@@ -110,7 +112,7 @@ public abstract class ClusterClient {
 	 */
 	private JobExecutionResult lastJobExecutionResult;
 
-	/** Switch for blocking/detached job submission of the client */
+	/** Switch for blocking/detached job submission of the client. */
 	private boolean detachedJobSubmission = false;
 
 	// ------------------------------------------------------------------------
@@ -153,7 +155,7 @@ public abstract class ClusterClient {
 			highAvailabilityServices,
 			Time.milliseconds(lookupTimeout.toMillis()),
 			flinkConfig,
-			LOG);
+			log);
 
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 	}
@@ -162,9 +164,12 @@ public abstract class ClusterClient {
 	//  Startup & Shutdown
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Utility class to lazily instantiate an {@link ActorSystem}.
+	 */
 	protected static class LazyActorSystemLoader {
 
-		private final Logger LOG;
+		private final Logger log;
 
 		private final HighAvailabilityServices highAvailabilityServices;
 
@@ -178,11 +183,11 @@ public abstract class ClusterClient {
 				HighAvailabilityServices highAvailabilityServices,
 				Time timeout,
 				Configuration configuration,
-				Logger LOG) {
+				Logger log) {
 			this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 			this.timeout = Preconditions.checkNotNull(timeout);
 			this.configuration = Preconditions.checkNotNull(configuration);
-			this.LOG = Preconditions.checkNotNull(LOG);
+			this.log = Preconditions.checkNotNull(log);
 		}
 
 		/**
@@ -210,7 +215,7 @@ public abstract class ClusterClient {
 
 			if (!isLoaded()) {
 				// start actor system
-				LOG.info("Starting client actor system.");
+				log.info("Starting client actor system.");
 
 				final InetAddress ownHostname;
 				try {
@@ -296,15 +301,13 @@ public abstract class ClusterClient {
 	// ------------------------------------------------------------------------
 
 	public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
-			throws CompilerException, ProgramInvocationException
-	{
+			throws CompilerException, ProgramInvocationException {
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
 		return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
 	}
 
 	public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
-			throws CompilerException, ProgramInvocationException
-	{
+			throws CompilerException, ProgramInvocationException {
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
 			return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
@@ -347,8 +350,7 @@ public abstract class ClusterClient {
 	 * @throws ProgramInvocationException
 	 */
 	public JobSubmissionResult run(PackagedProgram prog, int parallelism)
-			throws ProgramInvocationException, ProgramMissingJobException
-	{
+			throws ProgramInvocationException, ProgramMissingJobException {
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
 
@@ -362,7 +364,7 @@ public abstract class ClusterClient {
 			return run(jobWithJars, parallelism, prog.getSavepointSettings());
 		}
 		else if (prog.isUsingInteractiveMode()) {
-			LOG.info("Starting program in interactive mode");
+			log.info("Starting program in interactive mode");
 
 			final List<URL> libraries;
 			if (hasUserJarsInClassPath(prog.getAllLibraries())) {
@@ -436,8 +438,7 @@ public abstract class ClusterClient {
 
 	public JobSubmissionResult run(FlinkPlan compiledPlan,
 			List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
-		throws ProgramInvocationException
-	{
+			throws ProgramInvocationException {
 		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
 		return submitJob(job, classLoader);
 	}
@@ -507,7 +508,7 @@ public abstract class ClusterClient {
 	}
 
 	/**
-	 * Reattaches to a running from from the supplied job id
+	 * Reattaches to a running from from the supplied job id.
 	 * @param jobID The job id of the job to attach to
 	 * @return The JobExecutionResult for the jobID
 	 * @throws JobExecutionException if an error occurs during monitoring the job execution
@@ -612,7 +613,7 @@ public abstract class ClusterClient {
 	 * Stopping works only for streaming programs. Be aware, that the program might continue to run for
 	 * a while after sending the stop command, because after sources stopped to emit data all operators
 	 * need to finish processing.
-	 * 
+	 *
 	 * @param jobId
 	 *            the job ID of the streaming program to stop
 	 * @throws Exception
@@ -632,10 +633,10 @@ public abstract class ClusterClient {
 		final Object result = Await.result(response, timeout);
 
 		if (result instanceof JobManagerMessages.StoppingSuccess) {
-			LOG.info("Job stopping with ID " + jobId + " succeeded.");
+			log.info("Job stopping with ID " + jobId + " succeeded.");
 		} else if (result instanceof JobManagerMessages.StoppingFailure) {
 			final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause();
-			LOG.info("Job stopping with ID " + jobId + " failed.", t);
+			log.info("Job stopping with ID " + jobId + " failed.", t);
 			throw new Exception("Failed to stop the job because of \n" + t.getMessage());
 		} else {
 			throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
@@ -685,14 +686,13 @@ public abstract class ClusterClient {
 		}
 	}
 
-
 	// ------------------------------------------------------------------------
 	//  Sessions
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Tells the JobManager to finish the session (job) defined by the given ID.
-	 * 
+	 *
 	 * @param jobId The ID that identifies the session.
 	 */
 	public void endSession(JobID jobId) throws Exception {
@@ -713,10 +713,10 @@ public abstract class ClusterClient {
 		}
 
 		ActorGateway jobManagerGateway = getJobManagerGateway();
-		
+
 		for (JobID jid : jobIds) {
 			if (jid != null) {
-				LOG.info("Telling job manager to end the session {}.", jid);
+				log.info("Telling job manager to end the session {}.", jid);
 				jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
 			}
 		}
@@ -760,7 +760,7 @@ public abstract class ClusterClient {
 				throw new RuntimeException("URL is invalid. This should not happen.", e);
 			}
 		}
- 
+
 		job.setClasspaths(classpaths);
 
 		return job;
@@ -778,7 +778,7 @@ public abstract class ClusterClient {
 	 * @throws Exception
 	 */
 	public ActorGateway getJobManagerGateway() throws Exception {
-		LOG.debug("Looking up JobManager");
+		log.debug("Looking up JobManager");
 
 		try {
 			return LeaderRetrievalUtils.retrieveLeaderGateway(
@@ -796,7 +796,7 @@ public abstract class ClusterClient {
 	 * @param message The message to log/print
 	 */
 	protected void logAndSysout(String message) {
-		LOG.info(message);
+		log.info(message);
 		if (printStatusDuringExecution) {
 			System.out.println(message);
 		}
@@ -809,18 +809,18 @@ public abstract class ClusterClient {
 	/**
 	 * Blocks until the client has determined that the cluster is ready for Job submission.
 	 *
-	 * This is delayed until right before job submission to report any other errors first
+	 * <p>This is delayed until right before job submission to report any other errors first
 	 * (e.g. invalid job definitions/errors in the user jar)
 	 */
 	public abstract void waitForClusterToBeReady();
 
 	/**
-	 * Returns an URL (as a string) to the JobManager web interface
+	 * Returns an URL (as a string) to the JobManager web interface.
 	 */
 	public abstract String getWebInterfaceURL();
 
 	/**
-	 * Returns the latest cluster status, with number of Taskmanagers and slots
+	 * Returns the latest cluster status, with number of Taskmanagers and slots.
 	 */
 	public abstract GetClusterStatusResponse getClusterStatus();
 
@@ -857,7 +857,7 @@ public abstract class ClusterClient {
 	}
 
 	/**
-	 * Return the Flink configuration object
+	 * Return the Flink configuration object.
 	 * @return The Flink configuration object
 	 */
 	public Configuration getFlinkConfiguration() {
@@ -865,7 +865,7 @@ public abstract class ClusterClient {
 	}
 
 	/**
-	 * The client may define an upper limit on the number of slots to use
+	 * The client may define an upper limit on the number of slots to use.
 	 * @return -1 if unknown
 	 */
 	public abstract int getMaxSlots();

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 1ef94ce..7e47825 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -40,11 +40,11 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	protected final List<URL> jarFilesToAttach;
 
 	protected final List<URL> classpathsToAttach;
-	
+
 	protected final ClassLoader userCodeClassLoader;
 
 	protected final SavepointRestoreSettings savepointSettings;
-	
+
 	public ContextEnvironment(ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths,
 				ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings) {
 		this.client = remoteConnection;
@@ -83,11 +83,11 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism())
 				+ ") : " + getIdString();
 	}
-	
+
 	public ClusterClient getClient() {
 		return this.client;
 	}
-	
+
 	public List<URL> getJars(){
 		return jarFilesToAttach;
 	}
@@ -105,11 +105,11 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	static void setAsContext(ContextEnvironmentFactory factory) {
 		initializeContextEnvironment(factory);
 	}
-	
+
 	static void unsetContext() {
 		resetContextEnvironment();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index 0175d4c..6209254 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -51,8 +51,7 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
 	public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach,
 			List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
-			boolean isDetached, SavepointRestoreSettings savepointSettings)
-	{
+			boolean isDetached, SavepointRestoreSettings savepointSettings) {
 		this.client = client;
 		this.jarFilesToAttach = jarFilesToAttach;
 		this.classpathsToAttach = classpathsToAttach;
@@ -68,9 +67,9 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 			throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode");
 		}
 
-		lastEnvCreated = isDetached ?
-				new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings):
-				new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
+		lastEnvCreated = isDetached
+			? new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings)
+			: new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
 		if (defaultParallelism > 0) {
 			lastEnvCreated.setParallelism(defaultParallelism);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
index c67688f..63aa811 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,6 +77,9 @@ public class DetachedEnvironment extends ContextEnvironment {
 		return client.run(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
 	}
 
+	/**
+	 * The {@link JobExecutionResult} returned by a {@link DetachedEnvironment}.
+	 */
 	public static final class DetachedJobExecutionResult extends JobExecutionResult {
 
 		public static final DetachedJobExecutionResult INSTANCE = new DetachedJobExecutionResult();

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index d5a3014..ae94ece 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -26,21 +29,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
-
 /**
  * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain
  * the classes of the functions and libraries necessary for the execution.
  */
 public class JobWithJars {
-	
+
 	private Plan plan;
-	
+
 	private List<URL> jarFiles;
 
 	/**
-	 * classpaths that are needed during user code execution
+	 * classpaths that are needed during user code execution.
 	 */
 	private List<URL> classpaths;
 
@@ -68,7 +68,7 @@ public class JobWithJars {
 		this.jarFiles = Collections.singletonList(jarFile);
 		this.classpaths = Collections.<URL>emptyList();
 	}
-	
+
 	JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader) {
 		this.plan = plan;
 		this.jarFiles = jarFiles;
@@ -77,7 +77,7 @@ public class JobWithJars {
 	}
 
 	/**
-	 * Returns the plan
+	 * Returns the plan.
 	 */
 	public Plan getPlan() {
 		return this.plan;
@@ -89,17 +89,17 @@ public class JobWithJars {
 	public List<URL> getJarFiles() {
 		return this.jarFiles;
 	}
-	
+
 	/**
 	 * Returns list of classpaths that need to be submitted with the plan.
 	 */
 	public List<URL> getClasspaths() {
 		return classpaths;
 	}
-	
+
 	/**
 	 * Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
-	 * 
+	 *
 	 * @return The user code ClassLoader.
 	 */
 	public ClassLoader getUserCodeClassLoader() {
@@ -108,7 +108,6 @@ public class JobWithJars {
 		}
 		return this.userCodeClassLoader;
 	}
-	
 
 	public static void checkJarFile(URL jar) throws IOException {
 		File jarFile;
@@ -125,7 +124,7 @@ public class JobWithJars {
 		}
 		// TODO: Check if proper JAR file
 	}
-	
+
 	public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent) {
 		URL[] urls = new URL[jars.size() + classpaths.size()];
 		for (int i = 0; i < jars.size(); i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index 64076de..faacd9f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -28,6 +28,9 @@ import org.apache.flink.optimizer.plan.FlinkPlan;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 
+/**
+ * An {@link ExecutionEnvironment} that never executes a job but only creates the optimized plan.
+ */
 public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 
 	private final Optimizer compiler;
@@ -41,7 +44,7 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 	// ------------------------------------------------------------------------
 	//  Execution Environment methods
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan plan = createProgramPlan(jobName);
@@ -66,7 +69,7 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 	}
 
 	public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
-		
+
 		// temporarily write syserr and sysout to a byte array.
 		PrintStream originalOut = System.out;
 		PrintStream originalErr = System.err;
@@ -98,10 +101,10 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 
 		String stdout = baos.toString();
 		String stderr = baes.toString();
-		
+
 		throw new ProgramInvocationException(
 				"The program plan could not be fetched - the program aborted pre-maturely."
-						+ "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stdout) 
+						+ "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stdout)
 						+ "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stderr));
 	}
 	// ------------------------------------------------------------------------
@@ -116,13 +119,13 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
 		};
 		initializeContextEnvironment(factory);
 	}
-	
+
 	private void unsetAsContext() {
 		resetContextEnvironment();
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	public void setPlan(FlinkPlan plan){
 		this.optimizerPlan = plan;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index a44ee46..35bb04f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.Plan;
@@ -63,7 +62,7 @@ public class PackagedProgram {
 	 * Property name of the entry in JAR manifest file that describes the Flink specific entry point.
 	 */
 	public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
-	
+
 	/**
 	 * Property name of the entry in JAR manifest file that describes the class with the main method.
 	 */
@@ -74,17 +73,17 @@ public class PackagedProgram {
 	private final URL jarFile;
 
 	private final String[] args;
-	
+
 	private final Program program;
-	
+
 	private final Class<?> mainClass;
-	
+
 	private final List<File> extractedTempLibraries;
 
 	private final List<URL> classpaths;
-	
+
 	private ClassLoader userCodeClassLoader;
-	
+
 	private Plan plan;
 
 	private SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none();
@@ -110,7 +109,7 @@ public class PackagedProgram {
 	/**
 	 * Creates an instance that wraps the plan defined in the jar file using the given
 	 * argument.
-	 * 
+	 *
 	 * @param jarFile
 	 *        The jar file which contains the plan and a Manifest which defines
 	 *        the program-class
@@ -131,7 +130,7 @@ public class PackagedProgram {
 	 * Creates an instance that wraps the plan defined in the jar file using the given
 	 * arguments. For generating the plan the class defined in the className parameter
 	 * is used.
-	 * 
+	 *
 	 * @param jarFile
 	 *        The jar file which contains the plan.
 	 * @param entryPointClassName
@@ -152,7 +151,7 @@ public class PackagedProgram {
 	 * Creates an instance that wraps the plan defined in the jar file using the given
 	 * arguments. For generating the plan the class defined in the className parameter
 	 * is used.
-	 * 
+	 *
 	 * @param jarFile
 	 *        The jar file which contains the plan.
 	 * @param classpaths
@@ -171,32 +170,32 @@ public class PackagedProgram {
 		if (jarFile == null) {
 			throw new IllegalArgumentException("The jar file must not be null.");
 		}
-		
+
 		URL jarFileUrl;
 		try {
 			jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
 		} catch (MalformedURLException e1) {
 			throw new IllegalArgumentException("The jar file path is invalid.");
 		}
-		
+
 		checkJarFile(jarFileUrl);
-		
+
 		this.jarFile = jarFileUrl;
 		this.args = args == null ? new String[0] : args;
-		
+
 		// if no entryPointClassName name was given, we try and look one up through the manifest
 		if (entryPointClassName == null) {
 			entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl);
 		}
-		
+
 		// now that we have an entry point, we can extract the nested jar files (if any)
 		this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
 		this.classpaths = classpaths;
 		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
-		
+
 		// load the entry point class
 		this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
-		
+
 		// if the entry point is a program, instantiate the class and get the plan
 		if (Program.class.isAssignableFrom(this.mainClass)) {
 			Program prg = null;
@@ -206,7 +205,7 @@ public class PackagedProgram {
 				// validate that the class has a main method at least.
 				// the main method possibly instantiates the program properly
 				if (!hasMainMethod(mainClass)) {
-					throw new ProgramInvocationException("The given program class implements the " + 
+					throw new ProgramInvocationException("The given program class implements the " +
 							Program.class.getName() + " interface, but cannot be instantiated. " +
 							"It also declares no main(String[]) method as alternative entry point", e);
 				}
@@ -217,22 +216,22 @@ public class PackagedProgram {
 		} else if (hasMainMethod(mainClass)) {
 			this.program = null;
 		} else {
-			throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " + 
+			throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
 					Program.class.getName() + " interface.");
 		}
 	}
-	
+
 	PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
 		this.jarFile = null;
 		this.args = args == null ? new String[0] : args;
-		
+
 		this.extractedTempLibraries = Collections.emptyList();
 		this.classpaths = Collections.emptyList();
 		this.userCodeClassLoader = entryPointClass.getClassLoader();
-		
+
 		// load the entry point class
 		this.mainClass = entryPointClass;
-		
+
 		// if the entry point is a program, instantiate the class and get the plan
 		if (Program.class.isAssignableFrom(this.mainClass)) {
 			Program prg = null;
@@ -242,7 +241,7 @@ public class PackagedProgram {
 				// validate that the class has a main method at least.
 				// the main method possibly instantiates the program properly
 				if (!hasMainMethod(mainClass)) {
-					throw new ProgramInvocationException("The given program class implements the " + 
+					throw new ProgramInvocationException("The given program class implements the " +
 							Program.class.getName() + " interface, but cannot be instantiated. " +
 							"It also declares no main(String[]) method as alternative entry point", e);
 				}
@@ -253,7 +252,7 @@ public class PackagedProgram {
 		} else if (hasMainMethod(mainClass)) {
 			this.program = null;
 		} else {
-			throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " + 
+			throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
 					Program.class.getName() + " interface.");
 		}
 	}
@@ -269,15 +268,15 @@ public class PackagedProgram {
 	public String[] getArguments() {
 		return this.args;
 	}
-	
+
 	public String getMainClassName() {
 		return this.mainClass.getName();
 	}
-	
+
 	public boolean isUsingInteractiveMode() {
 		return this.program == null;
 	}
-	
+
 	public boolean isUsingProgramEntryPoint() {
 		return this.program != null;
 	}
@@ -410,7 +409,6 @@ public class PackagedProgram {
 	}
 
 	/**
-	 *
 	 * This method assumes that the context environment is prepared, or the execution
 	 * will be a local execution by default.
 	 */
@@ -468,11 +466,10 @@ public class PackagedProgram {
 		deleteExtractedLibraries(this.extractedTempLibraries);
 		this.extractedTempLibraries.clear();
 	}
-	
-	
+
 	/**
 	 * Returns the plan as generated from the Pact Assembler.
-	 * 
+	 *
 	 * @return The program's plan.
 	 * @throws ProgramInvocationException Thrown, if an error occurred in the program while
 	 *         creating the program's {@link Plan}.
@@ -482,10 +479,10 @@ public class PackagedProgram {
 			Thread.currentThread().setContextClassLoader(this.userCodeClassLoader);
 			this.plan = createPlanFromProgram(this.program, this.args);
 		}
-		
+
 		return this.plan;
 	}
-	
+
 	private static boolean hasMainMethod(Class<?> entryClass) {
 		Method mainMethod;
 		try {
@@ -494,13 +491,13 @@ public class PackagedProgram {
 			return false;
 		}
 		catch (Throwable t) {
-			throw new RuntimeException("Could not look up the main(String[]) method from the class " + 
+			throw new RuntimeException("Could not look up the main(String[]) method from the class " +
 					entryClass.getName() + ": " + t.getMessage(), t);
 		}
-		
+
 		return Modifier.isStatic(mainMethod.getModifiers()) && Modifier.isPublic(mainMethod.getModifiers());
 	}
-	
+
 	private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
 		Method mainMethod;
 		if (!Modifier.isPublic(entryClass.getModifiers())) {
@@ -513,17 +510,17 @@ public class PackagedProgram {
 			throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
 		}
 		catch (Throwable t) {
-			throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " + 
+			throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
 					entryClass.getName() + ": " + t.getMessage(), t);
 		}
-		
+
 		if (!Modifier.isStatic(mainMethod.getModifiers())) {
 			throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method.");
 		}
 		if (!Modifier.isPublic(mainMethod.getModifiers())) {
 			throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method.");
 		}
-		
+
 		try {
 			mainMethod.invoke(null, (Object) args);
 		}
@@ -574,20 +571,19 @@ public class PackagedProgram {
 				throw new ProgramInvocationException("The Manifest in the jar file could not be accessed '"
 					+ jarFile.getPath() + "'. " + ioex.getMessage(), ioex);
 			}
-	
+
 			if (manifest == null) {
 				throw new ProgramInvocationException("No manifest found in jar file '" + jarFile.getPath() + "'. The manifest is need to point to the program's main class.");
 			}
-	
+
 			Attributes attributes = manifest.getMainAttributes();
-			
+
 			// check for a "program-class" entry first
 			className = attributes.getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
 			if (className != null) {
 				return className;
 			}
-			
-			
+
 			// check for a main class
 			className = attributes.getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
 			if (className != null) {
@@ -605,7 +601,7 @@ public class PackagedProgram {
 			}
 		}
 	}
-	
+
 	private static Class<?> loadMainClass(String className, ClassLoader cl) throws ProgramInvocationException {
 		ClassLoader contextCl = null;
 		try {
@@ -627,20 +623,20 @@ public class PackagedProgram {
 		}
 		catch (Throwable t) {
 			throw new ProgramInvocationException("The program's entry point class '" + className
-				+ "' caused an exception during initialization: "+ t.getMessage(), t);
+				+ "' caused an exception during initialization: " + t.getMessage(), t);
 		} finally {
 			if (contextCl != null) {
 				Thread.currentThread().setContextClassLoader(contextCl);
 			}
 		}
 	}
-	
+
 	/**
 	 * Takes the jar described by the given file and invokes its pact assembler class to
 	 * assemble a plan. The assembler class name is either passed through a parameter,
 	 * or it is read from the manifest of the jar. The assembler is handed the given options
 	 * for its assembly.
-	 * 
+	 *
 	 * @param program The program to create the plan for.
 	 * @param options
 	 *        The options for the assembler.
@@ -655,33 +651,33 @@ public class PackagedProgram {
 			throw new ProgramInvocationException("Error while calling the program: " + t.getMessage(), t);
 		}
 	}
-	
+
 	/**
 	 * Takes all JAR files that are contained in this program's JAR file and extracts them
 	 * to the system's temp directory.
-	 * 
+	 *
 	 * @return The file names of the extracted temporary files.
 	 * @throws ProgramInvocationException Thrown, if the extraction process failed.
 	 */
 	public static List<File> extractContainedLibraries(URL jarFile) throws ProgramInvocationException {
-		
+
 		Random rnd = new Random();
-		
+
 		JarFile jar = null;
 		try {
 			jar = new JarFile(new File(jarFile.toURI()));
 			final List<JarEntry> containedJarFileEntries = new ArrayList<JarEntry>();
-			
+
 			Enumeration<JarEntry> entries = jar.entries();
 			while (entries.hasMoreElements()) {
 				JarEntry entry = entries.nextElement();
 				String name = entry.getName();
-				
+
 				if (name.length() > 8 && name.startsWith("lib/") && name.endsWith(".jar")) {
 					containedJarFileEntries.add(entry);
 				}
 			}
-			
+
 			if (containedJarFileEntries.isEmpty()) {
 				return Collections.emptyList();
 			}
@@ -689,15 +685,15 @@ public class PackagedProgram {
 				// go over all contained jar files
 				final List<File> extractedTempLibraries = new ArrayList<File>(containedJarFileEntries.size());
 				final byte[] buffer = new byte[4096];
-				
+
 				boolean incomplete = true;
-				
+
 				try {
 					for (int i = 0; i < containedJarFileEntries.size(); i++) {
 						final JarEntry entry = containedJarFileEntries.get(i);
 						String name = entry.getName();
 						name = name.replace(File.separatorChar, '_');
-					
+
 						File tempFile;
 						try {
 							tempFile = File.createTempFile(rnd.nextInt(Integer.MAX_VALUE) + "_", name);
@@ -705,21 +701,20 @@ public class PackagedProgram {
 						}
 						catch (IOException e) {
 							throw new ProgramInvocationException(
-								"An I/O error occurred while creating temporary file to extract nested library '" + 
+								"An I/O error occurred while creating temporary file to extract nested library '" +
 										entry.getName() + "'.", e);
 						}
-						
+
 						extractedTempLibraries.add(tempFile);
-						
+
 						// copy the temp file contents to a temporary File
 						OutputStream out = null;
-						InputStream in = null; 
+						InputStream in = null;
 						try {
-							
-							
+
 							out = new FileOutputStream(tempFile);
 							in = new BufferedInputStream(jar.getInputStream(entry));
-							
+
 							int numRead = 0;
 							while ((numRead = in.read(buffer)) != -1) {
 								out.write(buffer, 0, numRead);
@@ -738,7 +733,7 @@ public class PackagedProgram {
 							}
 						}
 					}
-					
+
 					incomplete = false;
 				}
 				finally {
@@ -746,7 +741,7 @@ public class PackagedProgram {
 						deleteExtractedLibraries(extractedTempLibraries);
 					}
 				}
-				
+
 				return extractedTempLibraries;
 			}
 		}
@@ -761,13 +756,13 @@ public class PackagedProgram {
 			}
 		}
 	}
-	
+
 	public static void deleteExtractedLibraries(List<File> tempLibraries) {
 		for (File f : tempLibraries) {
 			f.delete();
 		}
 	}
-	
+
 	private static void checkJarFile(URL jarfile) throws ProgramInvocationException {
 		try {
 			JobWithJars.checkJarFile(jarfile);

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
index 0051e60..271864f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
@@ -69,11 +69,10 @@ public final class PreviewPlanEnvironment extends ExecutionEnvironment {
 		};
 		initializeContextEnvironment(factory);
 	}
-	
+
 	public void unsetAsContext() {
 		resetContextEnvironment();
 	}
-	
 
 	public void setPreview(String preview) {
 		this.preview = preview;
@@ -82,4 +81,4 @@ public final class PreviewPlanEnvironment extends ExecutionEnvironment {
 	public Plan getPlan() {
 		return plan;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java
index 68bcba6..ee58227 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.program;
 
 /**
@@ -30,7 +29,7 @@ public class ProgramInvocationException extends Exception {
 
 	/**
 	 * Creates a <tt>ProgramInvocationException</tt> with the given message.
-	 * 
+	 *
 	 * @param message
 	 *        The message for the exception.
 	 */
@@ -40,7 +39,7 @@ public class ProgramInvocationException extends Exception {
 
 	/**
 	 * Creates a <tt>ProgramInvocationException</tt> for the given exception.
-	 * 
+	 *
 	 * @param cause
 	 *        The exception that causes the program invocation to fail.
 	 */
@@ -51,7 +50,7 @@ public class ProgramInvocationException extends Exception {
 	/**
 	 * Creates a <tt>ProgramInvocationException</tt> for the given exception with an
 	 * additional message.
-	 * 
+	 *
 	 * @param message
 	 *        The additional message.
 	 * @param cause

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index b00e519..19a365e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobSubmissionResult;
@@ -25,13 +26,14 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
 /**
  * Cluster client for communication with an standalone (on-premise) cluster or an existing cluster that has been
  * brought up independently of a specific job.
@@ -49,7 +51,6 @@ public class StandaloneClusterClient extends ClusterClient {
 	@Override
 	public void waitForClusterToBeReady() {}
 
-
 	@Override
 	public String getWebInterfaceURL() {
 		String host = getJobManagerAddress().getHostString();

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 8320e04..28c3226 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.junit.Before;
+
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index 5cc90eb..5a79bb6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -23,10 +23,14 @@ import org.junit.Test;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for the "info" command.
+ */
 public class CliFrontendInfoTest {
-	
+
 	private static PrintStream stdOut;
 	private static PrintStream capture;
 	private static ByteArrayOutputStream buffer;
@@ -41,7 +45,7 @@ public class CliFrontendInfoTest {
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode != 0);
 			}
-			
+
 			// test missing options
 			{
 				String[] parameters = {};
@@ -55,7 +59,7 @@ public class CliFrontendInfoTest {
 			fail("Program caused an exception: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testShowExecutionPlan() {
 		replaceStdOut();
@@ -74,7 +78,7 @@ public class CliFrontendInfoTest {
 			restoreStdOut();
 		}
 	}
-	
+
 	@Test
 	public void testShowExecutionPlanWithParallelism() {
 		replaceStdOut();

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 4d3405f..725d95a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -18,17 +18,18 @@
 
 package org.apache.flink.client;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -41,6 +42,9 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the CANCEL and LIST commands.
+ */
 public class CliFrontendListCancelTest {
 
 	private static ActorSystem actorSystem;
@@ -56,12 +60,12 @@ public class CliFrontendListCancelTest {
 		JavaTestKit.shutdownActorSystem(actorSystem);
 		actorSystem = null;
 	}
-	
+
 	@BeforeClass
 	public static void init() {
 		CliFrontendTestUtils.pipeSystemOutToNull();
 	}
-	
+
 	@Test
 	public void testCancel() {
 		try {
@@ -72,7 +76,7 @@ public class CliFrontendListCancelTest {
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode != 0);
 			}
-			
+
 			// test missing job id
 			{
 				String[] parameters = {};
@@ -80,7 +84,7 @@ public class CliFrontendListCancelTest {
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode != 0);
 			}
-			
+
 			// test cancel properly
 			{
 				JobID jid = new JobID();
@@ -96,7 +100,7 @@ public class CliFrontendListCancelTest {
 				);
 
 				final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
-				
+
 				String[] parameters = { jidString };
 				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
 
@@ -205,7 +209,7 @@ public class CliFrontendListCancelTest {
 				int retCode = testFrontend.list(parameters);
 				assertTrue(retCode != 0);
 			}
-			
+
 			// test list properly
 			{
 				final UUID leaderSessionID = UUID.randomUUID();
@@ -230,8 +234,7 @@ public class CliFrontendListCancelTest {
 		}
 	}
 
-
-	protected static final class InfoListTestCliFrontend extends CliFrontend {
+	private static final class InfoListTestCliFrontend extends CliFrontend {
 
 		private ActorGateway jobManagerGateway;
 
@@ -246,7 +249,7 @@ public class CliFrontendListCancelTest {
 		}
 	}
 
-	protected static final class CliJobManager extends FlinkUntypedActor {
+	private static final class CliJobManager extends FlinkUntypedActor {
 		private final JobID jobID;
 		private final UUID leaderSessionID;
 		private final String targetDirectory;


Mime
View raw message