flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [14/15] flink git commit: [FLINK-6701] Activate strict checkstyle for flink-yarn
Date Fri, 26 May 2017 19:16:23 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 63e6a4c..6099d18 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -37,75 +37,74 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
-import org.apache.flink.util.ExceptionUtils;
 
-import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
+
 /**
  * The yarn implementation of the resource manager. Used when the system is started
  * via the resource framework YARN.
  */
 public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
-	protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
-	/** The process environment variables */
-	private final Map<String, String> ENV;
+	/** The process environment variables. */
+	private final Map<String, String> env;
 
 	/** The default registration timeout for task executor in seconds. */
-	private final static int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300;
+	private static final int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300;
 
-	/** The heartbeat interval while the resource master is waiting for containers */
+	/** The heartbeat interval while the resource master is waiting for containers. */
 	private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
 
-	/** The default heartbeat interval during regular operation */
+	/** The default heartbeat interval during regular operation. */
 	private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
 
-	/** The default memory of task executor to allocate (in MB) */
+	/** The default memory of task executor to allocate (in MB). */
 	private static final int DEFAULT_TSK_EXECUTOR_MEMORY_SIZE = 1024;
 
 	/** Environment variable name of the final container id used by the YarnResourceManager.
 	 * Container ID generation may vary across Hadoop versions. */
-	final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
-	
+	static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+
 	/** Environment variable name of the hostname given by the YARN.
 	 * In task executor we use the hostnames given by YARN consistently throughout akka */
-	final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+	static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
-	/** Default heartbeat interval between this resource manager and the YARN ResourceManager */
+	/** Default heartbeat interval between this resource manager and the YARN ResourceManager. */
 	private final int yarnHeartbeatIntervalMillis;
 
 	private final Configuration flinkConfig;
 
 	private final YarnConfiguration yarnConfig;
 
-	/** Client to communicate with the Resource Manager (YARN's master) */
+	/** Client to communicate with the Resource Manager (YARN's master). */
 	private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
 
-	/** Client to communicate with the Node manager and launch TaskExecutor processes */
+	/** Client to communicate with the Node manager and launch TaskExecutor processes. */
 	private NMClient nodeManagerClient;
 
-	/** The number of containers requested, but not yet granted */
+	/** The number of containers requested, but not yet granted. */
 	private int numPendingContainerRequests;
 
-	final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
+	private final Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
 
 	public YarnResourceManager(
 			RpcService rpcService,
@@ -133,7 +132,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			fatalErrorHandler);
 		this.flinkConfig  = flinkConfig;
 		this.yarnConfig = new YarnConfiguration();
-		this.ENV = env;
+		this.env = env;
 		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
 				ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
 
@@ -161,7 +160,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			//TODO: the third paramter should be the webmonitor address
 			resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
 		} catch (Exception e) {
-			LOG.info("registerApplicationMaster fail", e);
+			log.info("registerApplicationMaster fail", e);
 		}
 
 		// create the client to communicate with the node managers
@@ -204,11 +203,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 
 		// first, de-register from YARN
 		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
-		LOG.info("Unregistering application from the YARN Resource Manager");
+		log.info("Unregistering application from the YARN Resource Manager");
 		try {
 			resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
 		} catch (Throwable t) {
-			LOG.error("Could not unregister the application master.", t);
+			log.error("Could not unregister the application master.", t);
 		}
 	}
 
@@ -217,8 +216,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		// Priority for worker containers - priorities are intra-application
 		//TODO: set priority according to the resource allocated
 		Priority priority = Priority.newInstance(generatePriority(resourceProfile));
-		int mem = resourceProfile.getMemoryInMB() < 0 ? DEFAULT_TSK_EXECUTOR_MEMORY_SIZE : (int)resourceProfile.getMemoryInMB();
-		int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores();
+		int mem = resourceProfile.getMemoryInMB() < 0 ? DEFAULT_TSK_EXECUTOR_MEMORY_SIZE : (int) resourceProfile.getMemoryInMB();
+		int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int) resourceProfile.getCpuCores();
 		Resource capability = Resource.newInstance(mem, vcore);
 		requestYarnContainer(capability, priority);
 	}
@@ -254,7 +253,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	public void onContainersAllocated(List<Container> containers) {
 		for (Container container : containers) {
 			numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
-			LOG.info("Received new container: {} - Remaining pending container requests: {}",
+			log.info("Received new container: {} - Remaining pending container requests: {}",
 					container.getId(), numPendingContainerRequests);
 			try {
 				/** Context information used to start a TaskExecutor Java process */
@@ -264,7 +263,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			}
 			catch (Throwable t) {
 				// failed to launch the container, will release the failed one and ask for a new one
-				LOG.error("Could not start TaskManager in container {},", container, t);
+				log.error("Could not start TaskManager in container {},", container, t);
 				resourceManagerClient.releaseAssignedContainer(container.getId());
 				requestYarnContainer(container.getResource(), container.getPriority());
 			}
@@ -279,7 +278,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		try {
 			shutDown();
 		} catch (Exception e) {
-			LOG.warn("Fail to shutdown the YARN resource manager.", e);
+			log.warn("Fail to shutdown the YARN resource manager.", e);
 		}
 	}
 
@@ -317,7 +316,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		}
 	}
 
-	// parse the host and port from akka address, 
+	// parse the host and port from akka address,
 	// the akka address is like akka.tcp://flink@100.81.153.180:49712/user/$a
 	private static Tuple2<String, Integer> parseHostPort(String address) {
 		String[] hostPort = address.split("@")[1].split(":");
@@ -333,35 +332,35 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
 		numPendingContainerRequests++;
-		LOG.info("Requesting new TaskManager container pending requests: {}",
+		log.info("Requesting new TaskManager container pending requests: {}",
 				numPendingContainerRequests);
 	}
 
 	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
 			throws Exception {
 		// init the ContainerLaunchContext
-		final String currDir = ENV.get(ApplicationConstants.Environment.PWD.key());
+		final String currDir = env.get(ApplicationConstants.Environment.PWD.key());
 
 		final ContaineredTaskManagerParameters taskManagerParameters =
 				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 1);
 
-		LOG.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " +
+		log.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " +
 				"JVM direct memory limit {} MB",
 				containerId,
 				taskManagerParameters.taskManagerTotalMemoryMB(),
 				taskManagerParameters.taskManagerHeapSizeMB(),
 				taskManagerParameters.taskManagerDirectMemoryLimitMB());
-		int timeout = flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, 
+		int timeout = flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
 				DEFAULT_TASK_MANAGER_REGISTRATION_DURATION);
 		FiniteDuration teRegistrationTimeout = new FiniteDuration(timeout, TimeUnit.SECONDS);
 		final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
 				flinkConfig, "", 0, 1, teRegistrationTimeout);
-		LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+		log.debug("TaskManager configuration: {}", taskManagerConfig);
 
 		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
-				flinkConfig, yarnConfig, ENV,
+				flinkConfig, yarnConfig, env,
 				taskManagerParameters, taskManagerConfig,
-				currDir, YarnTaskExecutorRunner.class, LOG);
+				currDir, YarnTaskExecutorRunner.class, log);
 
 		// set a special environment variable to uniquely identify this container
 		taskExecutorLaunchContext.getEnvironment()
@@ -373,7 +372,6 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 
 
 
-	
 	/**
 	 * Generate priority by given resource profile.
 	 * Priority is only used for distinguishing request of different resource.

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
index 2372cbc..62729a4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.yarn;
 
-import akka.actor.ActorRef;
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.yarn.messages.ContainersAllocated;
 import org.apache.flink.yarn.messages.ContainersComplete;
 
+import akka.actor.ActorRef;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -37,16 +37,16 @@ import java.util.List;
  */
 public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.CallbackHandler {
 
-	/** The yarn master to which we report the callbacks */
+	/** The yarn master to which we report the callbacks. */
 	private ActorRef yarnFrameworkMaster;
 
-	/** The progress we report */
+	/** The progress we report. */
 	private float currentProgress;
 
 	public YarnResourceManagerCallbackHandler() {
 		this(null);
 	}
-	
+
 	public YarnResourceManagerCallbackHandler(ActorRef yarnFrameworkMaster) {
 		this.yarnFrameworkMaster = yarnFrameworkMaster;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 398a5eb..2ed4c1d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
+
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -52,13 +53,12 @@ import java.util.concurrent.Callable;
  */
 public class YarnTaskExecutorRunner {
 
-	/** Logger */
 	protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class);
 
-	/** The process environment variables */
+	/** The process environment variables. */
 	private static final Map<String, String> ENV = System.getenv();
 
-	/** The exit code returned if the initialization of the yarn task executor runner failed */
+	/** The exit code returned if the initialization of the yarn task executor runner failed. */
 	private static final int INIT_ERROR_EXIT_CODE = 31;
 
 	private MetricRegistry metricRegistry;
@@ -131,7 +131,7 @@ public class YarnTaskExecutorRunner {
 			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
 
 			String keytabPath = null;
-			if(remoteKeytabPath != null) {
+			if (remoteKeytabPath != null) {
 				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
 				keytabPath = f.getAbsolutePath();
 				LOG.info("keytab path: {}", keytabPath);
@@ -252,7 +252,6 @@ public class YarnTaskExecutorRunner {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-
 	protected void shutdown() {
 			if (taskExecutorRpcService != null) {
 				try {

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 047a1fa..265c5a6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -31,17 +26,21 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
+
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
 /**
  * The entry point for running a TaskManager in a YARN container.
  */
@@ -95,7 +94,7 @@ public class YarnTaskManagerRunner {
 		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
 
 		String localKeytabPath = null;
-		if(remoteKeytabPath != null) {
+		if (remoteKeytabPath != null) {
 			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
 			localKeytabPath = f.getAbsolutePath();
 			LOG.info("localKeytabPath: {}", localKeytabPath);
@@ -104,7 +103,7 @@ public class YarnTaskManagerRunner {
 		UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 
 		LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
-				currentUser.getShortUserName(), yarnClientUsername );
+				currentUser.getShortUserName(), yarnClientUsername);
 
 		// Infer the resource identifier from the environment variable
 		String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
@@ -153,7 +152,7 @@ public class YarnTaskManagerRunner {
 					return null;
 				}
 			});
-		} catch(Exception e) {
+		} catch (Exception e) {
 			LOG.error("Exception occurred while launching Task Manager", e);
 			throw new RuntimeException(e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
index 6ce8d17..aaa9bac 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -15,19 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.yarn.cli;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.YarnClusterClientV2;
 import org.apache.flink.yarn.YarnClusterDescriptorV2;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,27 +50,27 @@ import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
 public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCLI.class);
 
-	/** The id for the CommandLine interface */
+	/** The id for the CommandLine interface. */
 	private static final String ID = "yarn";
 
 	private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
 
 	//------------------------------------ Command Line argument options -------------------------
 	// the prefix transformation is used by the CliFrontend static constructor.
-	private final Option QUEUE;
-	private final Option SHIP_PATH;
-	private final Option FLINK_JAR;
-	private final Option JM_MEMORY;
-	private final Option DETACHED;
-	private final Option ZOOKEEPER_NAMESPACE;
+	private final Option queue;
+	private final Option shipPath;
+	private final Option flinkJar;
+	private final Option jmMemory;
+	private final Option detached;
+	private final Option zookeeperNamespace;
 
-	private final Options ALL_OPTIONS;
+	private final Options allOptions;
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
+	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>.
 	 */
-	private final Option DYNAMIC_PROPERTIES;
+	private final Option dynamicProperties;
 
 	//------------------------------------ Internal fields -------------------------
 	// use detach mode as default
@@ -76,22 +78,22 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 
 	public FlinkYarnCLI(String shortPrefix, String longPrefix) {
 
-		QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
-		SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
-		FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
-		JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
-		DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
-		DETACHED = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached");
-		ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
-
-		ALL_OPTIONS = new Options();
-		ALL_OPTIONS.addOption(FLINK_JAR);
-		ALL_OPTIONS.addOption(JM_MEMORY);
-		ALL_OPTIONS.addOption(QUEUE);
-		ALL_OPTIONS.addOption(SHIP_PATH);
-		ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
-		ALL_OPTIONS.addOption(DETACHED);
-		ALL_OPTIONS.addOption(ZOOKEEPER_NAMESPACE);
+		queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
+		shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
+		flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
+		jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+		dynamicProperties = new Option(shortPrefix + "D", true, "Dynamic properties");
+		detached = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached");
+		zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
+
+		allOptions = new Options();
+		allOptions.addOption(flinkJar);
+		allOptions.addOption(jmMemory);
+		allOptions.addOption(queue);
+		allOptions.addOption(shipPath);
+		allOptions.addOption(dynamicProperties);
+		allOptions.addOption(detached);
+		allOptions.addOption(zookeeperNamespace);
 	}
 
 	public YarnClusterDescriptorV2 createDescriptor(String defaultApplicationName, CommandLine cmd) {
@@ -100,8 +102,8 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 
 		// Jar Path
 		Path localJarPath;
-		if (cmd.hasOption(FLINK_JAR.getOpt())) {
-			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
+		if (cmd.hasOption(flinkJar.getOpt())) {
+			String userPath = cmd.getOptionValue(flinkJar.getOpt());
 			if (!userPath.startsWith("file://")) {
 				userPath = "file://" + userPath;
 			}
@@ -117,7 +119,7 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 				localJarPath = new Path(new File(decodedPath).toURI());
 			} catch (UnsupportedEncodingException e) {
 				throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
-					" Please supply a path manually via the -" + FLINK_JAR.getOpt() + " option.");
+					" Please supply a path manually via the -" + flinkJar.getOpt() + " option.");
 			}
 		}
 
@@ -125,8 +127,8 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 
 		List<File> shipFiles = new ArrayList<>();
 		// path to directory to ship
-		if (cmd.hasOption(SHIP_PATH.getOpt())) {
-			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
+		if (cmd.hasOption(shipPath.getOpt())) {
+			String shipPath = cmd.getOptionValue(this.shipPath.getOpt());
 			File shipDir = new File(shipPath);
 			if (shipDir.isDirectory()) {
 				shipFiles.add(shipDir);
@@ -138,36 +140,36 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 		yarnClusterDescriptor.addShipFiles(shipFiles);
 
 		// queue
-		if (cmd.hasOption(QUEUE.getOpt())) {
-			yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
+		if (cmd.hasOption(queue.getOpt())) {
+			yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
 		}
 
 		// JobManager Memory
-		if (cmd.hasOption(JM_MEMORY.getOpt())) {
-			int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+		if (cmd.hasOption(jmMemory.getOpt())) {
+			int jmMemory = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt()));
 			yarnClusterDescriptor.setJobManagerMemory(jmMemory);
 		}
 
 		String[] dynamicProperties = null;
-		if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
-			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+		if (cmd.hasOption(this.dynamicProperties.getOpt())) {
+			dynamicProperties = cmd.getOptionValues(this.dynamicProperties.getOpt());
 		}
 		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
 
 		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
 
-		if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
+		if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
 			// TODO: not support non detach mode now.
 			//this.detachedMode = false;
 		}
 		yarnClusterDescriptor.setDetachedMode(this.detachedMode);
 
-		if(defaultApplicationName != null) {
+		if (defaultApplicationName != null) {
 			yarnClusterDescriptor.setName(defaultApplicationName);
 		}
 
-		if (cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt())) {
-			String zookeeperNamespace = cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt());
+		if (cmd.hasOption(zookeeperNamespace.getOpt())) {
+			String zookeeperNamespace = cmd.getOptionValue(this.zookeeperNamespace.getOpt());
 			yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
 		}
 
@@ -201,7 +203,7 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 
 	@Override
 	public void addRunOptions(Options baseOptions) {
-		for (Object option : ALL_OPTIONS.getOptions()) {
+		for (Object option : allOptions.getOptions()) {
 			baseOptions.addOption((Option) option);
 		}
 	}
@@ -233,9 +235,6 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 		return new YarnClusterClientV2(yarnClusterDescriptor, config);
 	}
 
-	/**
-	 * Utility method
-	 */
 	private void logAndSysout(String message) {
 		LOG.info(message);
 		System.out.println(message);

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 53253d6..f15314a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -15,15 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.yarn.cli;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
@@ -38,6 +32,14 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.yarn.YarnClusterDescriptor;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
@@ -79,7 +81,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	private static final int CLIENT_POLLING_INTERVALL = 3;
 
-	/** The id for the CommandLine interface */
+	/** The id for the CommandLine interface. */
 	private static final String ID = "yarn-cluster";
 
 	// YARN-session related constants
@@ -92,19 +94,19 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	//------------------------------------ Command Line argument options -------------------------
 	// the prefix transformation is used by the CliFrontend static constructor.
-	private final Option QUERY;
+	private final Option query;
 	// --- or ---
-	private final Option APPLICATION_ID;
+	private final Option applicationId;
 	// --- or ---
-	private final Option QUEUE;
-	private final Option SHIP_PATH;
-	private final Option FLINK_JAR;
-	private final Option JM_MEMORY;
-	private final Option TM_MEMORY;
-	private final Option CONTAINER;
-	private final Option SLOTS;
-	private final Option DETACHED;
-	private final Option ZOOKEEPER_NAMESPACE;
+	private final Option queue;
+	private final Option shipPath;
+	private final Option flinkJar;
+	private final Option jmMemory;
+	private final Option tmMemory;
+	private final Option container;
+	private final Option slots;
+	private final Option detached;
+	private final Option zookeeperNamespace;
 
 	/**
 	 * @deprecated Streaming mode has been deprecated without replacement. Set the
@@ -112,16 +114,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	 * key to true to get the previous batch mode behaviour.
 	 */
 	@Deprecated
-	private final Option STREAMING;
-	private final Option NAME;
+	private final Option streaming;
+	private final Option name;
 
-	private final Options ALL_OPTIONS;
+	private final Options allOptions;
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
+	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>.
 	 */
-	private final Option DYNAMIC_PROPERTIES;
+	private final Option dynamicproperties;
 
 	private final boolean acceptInteractiveInput;
 
@@ -136,41 +138,40 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) {
 		this.acceptInteractiveInput = acceptInteractiveInput;
 
-		QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
-		APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
-		QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
-		SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
-		FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
-		JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
-		TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
-		CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
-		SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
-		DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
-		DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
-		STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
-		NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
-		ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
-
-		ALL_OPTIONS = new Options();
-		ALL_OPTIONS.addOption(FLINK_JAR);
-		ALL_OPTIONS.addOption(JM_MEMORY);
-		ALL_OPTIONS.addOption(TM_MEMORY);
-		ALL_OPTIONS.addOption(CONTAINER);
-		ALL_OPTIONS.addOption(QUEUE);
-		ALL_OPTIONS.addOption(QUERY);
-		ALL_OPTIONS.addOption(SHIP_PATH);
-		ALL_OPTIONS.addOption(SLOTS);
-		ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
-		ALL_OPTIONS.addOption(DETACHED);
-		ALL_OPTIONS.addOption(STREAMING);
-		ALL_OPTIONS.addOption(NAME);
-		ALL_OPTIONS.addOption(APPLICATION_ID);
-		ALL_OPTIONS.addOption(ZOOKEEPER_NAMESPACE);
+		query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
+		applicationId = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
+		queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
+		shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
+		flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
+		jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+		tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
+		container = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
+		slots = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
+		dynamicproperties = new Option(shortPrefix + "D", true, "Dynamic properties");
+		detached = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
+		streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
+		name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
+		zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
+
+		allOptions = new Options();
+		allOptions.addOption(flinkJar);
+		allOptions.addOption(jmMemory);
+		allOptions.addOption(tmMemory);
+		allOptions.addOption(container);
+		allOptions.addOption(queue);
+		allOptions.addOption(query);
+		allOptions.addOption(shipPath);
+		allOptions.addOption(slots);
+		allOptions.addOption(dynamicproperties);
+		allOptions.addOption(detached);
+		allOptions.addOption(streaming);
+		allOptions.addOption(name);
+		allOptions.addOption(applicationId);
+		allOptions.addOption(zookeeperNamespace);
 	}
 
-
 	/**
-	 * Tries to load a Flink Yarn properties file and returns the Yarn application id if successful
+	 * Tries to load a Flink Yarn properties file and returns the Yarn application id if successful.
 	 * @param cmdLine The command-line parameters
 	 * @param flinkConfiguration The flink configuration
 	 * @return Yarn application id or null if none could be retrieved
@@ -184,8 +185,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		}
 
 		for (Option option : cmdLine.getOptions()) {
-			if (ALL_OPTIONS.hasOption(option.getOpt())) {
-				if (!option.getOpt().equals(DETACHED.getOpt())) {
+			if (allOptions.hasOption(option.getOpt())) {
+				if (!option.getOpt().equals(detached.getOpt())) {
 					// don't resume from properties file if yarn options have been specified
 					return null;
 				}
@@ -257,17 +258,17 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 		AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor();
 
-		if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
-			LOG.error("Missing required argument {}", CONTAINER.getOpt());
+		if (!cmd.hasOption(container.getOpt())) { // number of containers is required option!
+			LOG.error("Missing required argument {}", container.getOpt());
 			printUsage();
-			throw new IllegalArgumentException("Missing required argument " + CONTAINER.getOpt());
+			throw new IllegalArgumentException("Missing required argument " + container.getOpt());
 		}
-		yarnClusterDescriptor.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt())));
+		yarnClusterDescriptor.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(container.getOpt())));
 
 		// Jar Path
 		Path localJarPath;
-		if (cmd.hasOption(FLINK_JAR.getOpt())) {
-			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
+		if (cmd.hasOption(flinkJar.getOpt())) {
+			String userPath = cmd.getOptionValue(flinkJar.getOpt());
 			if (!userPath.startsWith("file://")) {
 				userPath = "file://" + userPath;
 			}
@@ -283,7 +284,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 				localJarPath = new Path(new File(decodedPath).toURI());
 			} catch (UnsupportedEncodingException e) {
 				throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
-					" Please supply a path manually via the -" + FLINK_JAR.getOpt() + " option.");
+					" Please supply a path manually via the -" + flinkJar.getOpt() + " option.");
 			}
 		}
 
@@ -291,8 +292,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 		List<File> shipFiles = new ArrayList<>();
 		// path to directory to ship
-		if (cmd.hasOption(SHIP_PATH.getOpt())) {
-			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
+		if (cmd.hasOption(shipPath.getOpt())) {
+			String shipPath = cmd.getOptionValue(this.shipPath.getOpt());
 			File shipDir = new File(shipPath);
 			if (shipDir.isDirectory()) {
 				shipFiles.add(shipDir);
@@ -304,51 +305,51 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		yarnClusterDescriptor.addShipFiles(shipFiles);
 
 		// queue
-		if (cmd.hasOption(QUEUE.getOpt())) {
-			yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
+		if (cmd.hasOption(queue.getOpt())) {
+			yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
 		}
 
 		// JobManager Memory
-		if (cmd.hasOption(JM_MEMORY.getOpt())) {
-			int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+		if (cmd.hasOption(jmMemory.getOpt())) {
+			int jmMemory = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt()));
 			yarnClusterDescriptor.setJobManagerMemory(jmMemory);
 		}
 
 		// Task Managers memory
-		if (cmd.hasOption(TM_MEMORY.getOpt())) {
-			int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
+		if (cmd.hasOption(tmMemory.getOpt())) {
+			int tmMemory = Integer.valueOf(cmd.getOptionValue(this.tmMemory.getOpt()));
 			yarnClusterDescriptor.setTaskManagerMemory(tmMemory);
 		}
 
-		if (cmd.hasOption(SLOTS.getOpt())) {
-			int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
+		if (cmd.hasOption(slots.getOpt())) {
+			int slots = Integer.valueOf(cmd.getOptionValue(this.slots.getOpt()));
 			yarnClusterDescriptor.setTaskManagerSlots(slots);
 		}
 
 		String[] dynamicProperties = null;
-		if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
-			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+		if (cmd.hasOption(dynamicproperties.getOpt())) {
+			dynamicProperties = cmd.getOptionValues(dynamicproperties.getOpt());
 		}
 		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
 
 		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
 
-		if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
+		if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
 			this.detachedMode = true;
 			yarnClusterDescriptor.setDetachedMode(true);
 		}
 
-		if(cmd.hasOption(NAME.getOpt())) {
-			yarnClusterDescriptor.setName(cmd.getOptionValue(NAME.getOpt()));
+		if (cmd.hasOption(name.getOpt())) {
+			yarnClusterDescriptor.setName(cmd.getOptionValue(name.getOpt()));
 		} else {
 			// set the default application name, if none is specified
-			if(defaultApplicationName != null) {
+			if (defaultApplicationName != null) {
 				yarnClusterDescriptor.setName(defaultApplicationName);
 			}
 		}
 
-		if (cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt())) {
-			String zookeeperNamespace = cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt());
+		if (cmd.hasOption(zookeeperNamespace.getOpt())) {
+			String zookeeperNamespace = cmd.getOptionValue(this.zookeeperNamespace.getOpt());
 			yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
 		}
 
@@ -368,7 +369,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			String message = "The YARN cluster has " + maxSlots + " slots available, " +
 				"but the user requested a parallelism of " + userParallelism + " on YARN. " +
 				"Each of the " + yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " +
-				"will get "+slotsPerTM+" slots.";
+				"will get " + slotsPerTM + " slots.";
 			logAndSysout(message);
 			yarnClusterDescriptor.setTaskManagerSlots(slotsPerTM);
 		}
@@ -383,7 +384,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		formatter.setLeftPadding(5);
 		formatter.setSyntaxPrefix("   Required");
 		Options req = new Options();
-		req.addOption(CONTAINER);
+		req.addOption(container);
 		formatter.printHelp(" ", req);
 
 		formatter.setSyntaxPrefix("   Optional");
@@ -403,7 +404,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	}
 
 	public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean readConsoleInput) {
-		final String HELP = "Available commands:\n" +
+		final String help = "Available commands:\n" +
 				"help - show these commands\n" +
 				"stop - stop the YARN session";
 		int numTaskmanagers = 0;
@@ -443,8 +444,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 				// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
 				long startTime = System.currentTimeMillis();
 				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
-						&& (!readConsoleInput || !in.ready()))
-				{
+						&& (!readConsoleInput || !in.ready())) {
 					Thread.sleep(200);
 				}
 				//------------- handle interactive command by user. ----------------------
@@ -458,10 +458,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 							break label;
 
 						case "help":
-							System.err.println(HELP);
+							System.err.println(help);
 							break;
 						default:
-							System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP);
+							System.err.println("Unknown command '" + command + "'. Showing help: \n" + help);
 							break;
 					}
 				}
@@ -471,7 +471,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 					break;
 				}
 			}
-		} catch(Exception e) {
+		} catch (Exception e) {
 			LOG.warn("Exception while running the interactive command line interface", e);
 		}
 	}
@@ -493,7 +493,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	public boolean isActive(CommandLine commandLine, Configuration configuration) {
 		String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
 		boolean yarnJobManager = ID.equals(jobManagerOption);
-		boolean yarnAppId = commandLine.hasOption(APPLICATION_ID.getOpt());
+		boolean yarnAppId = commandLine.hasOption(applicationId.getOpt());
 		return yarnJobManager || yarnAppId || loadYarnPropertiesFile(commandLine, configuration) != null;
 	}
 
@@ -504,14 +504,14 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	@Override
 	public void addRunOptions(Options baseOptions) {
-		for (Object option : ALL_OPTIONS.getOptions()) {
+		for (Object option : allOptions.getOptions()) {
 			baseOptions.addOption((Option) option);
 		}
 	}
 
 	@Override
 	public void addGeneralOptions(Options baseOptions) {
-		baseOptions.addOption(APPLICATION_ID);
+		baseOptions.addOption(applicationId);
 	}
 
 	@Override
@@ -520,13 +520,13 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			Configuration config) throws UnsupportedOperationException {
 
 		// first check for an application id, then try to load from yarn properties
-		String applicationID = cmdLine.hasOption(APPLICATION_ID.getOpt()) ?
-				cmdLine.getOptionValue(APPLICATION_ID.getOpt())
+		String applicationID = cmdLine.hasOption(applicationId.getOpt()) ?
+				cmdLine.getOptionValue(applicationId.getOpt())
 				: loadYarnPropertiesFile(cmdLine, config);
 
-		if(null != applicationID) {
-			String zkNamespace = cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ?
-					cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt())
+		if (null != applicationID) {
+			String zkNamespace = cmdLine.hasOption(zookeeperNamespace.getOpt()) ?
+					cmdLine.getOptionValue(zookeeperNamespace.getOpt())
 					: config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID);
 			config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
 
@@ -570,39 +570,39 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		CommandLine cmd;
 		try {
 			cmd = parser.parse(options, args);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			System.out.println(e.getMessage());
 			printUsage();
 			return 1;
 		}
 
 		// Query cluster for metrics
-		if (cmd.hasOption(QUERY.getOpt())) {
+		if (cmd.hasOption(query.getOpt())) {
 			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
 			String description;
 			try {
 				description = yarnDescriptor.getClusterDescription();
 			} catch (Exception e) {
-				System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage());
+				System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage());
 				e.printStackTrace(System.err);
 				return 1;
 			}
 			System.out.println(description);
 			return 0;
-		} else if (cmd.hasOption(APPLICATION_ID.getOpt())) {
+		} else if (cmd.hasOption(applicationId.getOpt())) {
 
 			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
 
 			//configure ZK namespace depending on the value passed
-			String zkNamespace = cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ?
-									cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt())
-									:yarnDescriptor.getFlinkConfiguration()
-									.getString(HA_ZOOKEEPER_NAMESPACE_KEY, cmd.getOptionValue(APPLICATION_ID.getOpt()));
+			String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
+									cmd.getOptionValue(zookeeperNamespace.getOpt())
+									: yarnDescriptor.getFlinkConfiguration()
+									.getString(HA_ZOOKEEPER_NAMESPACE_KEY, cmd.getOptionValue(applicationId.getOpt()));
 			LOG.info("Going to use the ZK namespace: {}", zkNamespace);
 			yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
 
 			try {
-				yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(APPLICATION_ID.getOpt()));
+				yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
 			} catch (Exception e) {
 				throw new RuntimeException("Could not retrieve existing Yarn application", e);
 			}
@@ -610,7 +610,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			if (detachedMode) {
 				LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
 					"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-					"yarn application -kill " + APPLICATION_ID.getOpt());
+					"yarn application -kill " + applicationId.getOpt());
 				yarnCluster.disconnect();
 			} else {
 				runInteractiveCli(yarnCluster, true);
@@ -629,7 +629,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			try {
 				yarnCluster = yarnDescriptor.deploy();
 			} catch (Exception e) {
-				System.err.println("Error while deploying YARN cluster: "+e.getMessage());
+				System.err.println("Error while deploying YARN cluster: " + e.getMessage());
 				e.printStackTrace(System.err);
 				return 1;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 8839c1e..28ef2ab 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -24,7 +24,8 @@ import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
  * This class holds configuration constants used by Flink's YARN runners.
- * These options are not expected to be ever configured by users explicitly. 
+ *
+ * <p>These options are not expected to be ever configured by users explicitly.
  */
 public class YarnConfigOptions {
 
@@ -50,11 +51,11 @@ public class YarnConfigOptions {
 	public static final ConfigOption<String> CLASSPATH_INCLUDE_USER_JAR =
 		key("yarn.per-job-cluster.include-user-jar")
 			.defaultValue("ORDER");
-	
+
 
 	// ------------------------------------------------------------------------
 
-	/** This class is not meant to be instantiated */
+	/** This class is not meant to be instantiated. */
 	private YarnConfigOptions() {}
 
 	/** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
index f81d040..d94921e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -39,20 +39,20 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The basis of {@link HighAvailabilityServices} for YARN setups.
  * These high-availability services auto-configure YARN's HDFS and the YARN application's
  * working directory to be used to store job recovery data.
- * 
+ *
  * <p>Note for implementers: This class locks access to and creation of services,
  * to make sure all services are properly shut down when shutting down this class.
  * To participate in the checks, overriding methods should frame method body with
  * calls to {@code enter()} and {@code exit()} as shown in the following pattern:
- * 
+ *
  * <pre>{@code
  * public LeaderRetrievalService getResourceManagerLeaderRetriever() {
  *     enter();
@@ -67,21 +67,21 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public abstract class YarnHighAvailabilityServices implements HighAvailabilityServices {
 
-	/** The name of the sub directory in which Flink stores the recovery data */
+	/** The name of the sub directory in which Flink stores the recovery data. */
 	public static final String FLINK_RECOVERY_DATA_DIR = "flink_recovery_data";
 
-	/** Logger for these services, shared with subclasses */
+	/** Logger for these services, shared with subclasses. */
 	protected static final Logger LOG = LoggerFactory.getLogger(YarnHighAvailabilityServices.class);
 
 	// ------------------------------------------------------------------------
 
-	/** The lock that guards all accesses to methods in this class */
+	/** The lock that guards all accesses to methods in this class. */
 	private final ReentrantLock lock;
 
-	/** The Flink FileSystem object that represent the HDFS used by YARN */
+	/** The Flink FileSystem object that represent the HDFS used by YARN. */
 	protected final FileSystem flinkFileSystem;
 
-	/** The Hadoop FileSystem object that represent the HDFS used by YARN */
+	/** The Hadoop FileSystem object that represent the HDFS used by YARN. */
 	protected final org.apache.hadoop.fs.FileSystem hadoopFileSystem;
 
 	/** The working directory of this YARN application.
@@ -89,13 +89,13 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	protected final Path workingDirectory;
 
 	/** The directory for HA persistent data. This should be deleted when the
-	 * HA services clean up */
+	 * HA services clean up. */
 	protected final Path haDataDirectory;
 
-	/** Blob store service to be used for the BlobServer and BlobCache */
+	/** Blob store service to be used for the BlobServer and BlobCache. */
 	protected final BlobStoreService blobStoreService;
 
-	/** Flag marking this instance as shut down */
+	/** Flag marking this instance as shut down. */
 	private volatile boolean closed;
 
 	// ------------------------------------------------------------------------
@@ -103,13 +103,13 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	/**
 	 * Creates new YARN high-availability services, configuring the file system and recovery
 	 * data directory based on the working directory in the given Hadoop configuration.
-	 * 
+	 *
 	 * <p>This class requires that the default Hadoop file system configured in the given
 	 * Hadoop configuration is an HDFS.
-	 * 
+	 *
 	 * @param config     The Flink configuration of this component / process.
 	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails.
 	 */
 	protected YarnHighAvailabilityServices(
@@ -280,7 +280,7 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	/**
 	 * Acquires the lock and checks whether the services are already closed. If they are
 	 * already closed, the method releases the lock and returns {@code false}.
-	 * 
+	 *
 	 * @return True, if the lock was acquired and the services are not closed, false if the services are closed.
 	 */
 	boolean enterUnlessClosed() {
@@ -307,12 +307,12 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	/**
 	 * Creates the high-availability services for a single-job Flink YARN application, to be
 	 * used in the Application Master that runs both ResourceManager and JobManager.
-	 * 
+	 *
 	 * @param flinkConfig  The Flink configuration.
 	 * @param hadoopConfig The Hadoop configuration for the YARN cluster.
-	 * 
+	 *
 	 * @return The created high-availability services.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the high-availability services could not be initialized.
 	 */
 	public static YarnHighAvailabilityServices forSingleJobAppMaster(

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
index abfdb5c..accf8d5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
@@ -34,7 +34,7 @@ import java.util.concurrent.Executors;
 /**
  * These YarnHighAvailabilityServices are for the Application Master in setups where there is one
  * ResourceManager that is statically configured in the Flink configuration.
- * 
+ *
  * <h3>Handled failure types</h3>
  * <ul>
  *     <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li>
@@ -51,11 +51,11 @@ import java.util.concurrent.Executors;
  * <p>Internally, these services put their recovery data into YARN's working directory,
  * except for checkpoints, which are in the configured checkpoint directory. That way,
  * checkpoints can be resumed with a new job/application, even if the complete YARN application
- * is killed and cleaned up. 
+ * is killed and cleaned up.
  *
  * <p>Because ResourceManager and JobManager run both in the same process (Application Master), they
  * use an embedded leader election service to find each other.
- * 
+ *
  * <p>A typical YARN setup that uses these HA services first starts the ResourceManager
  * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which
  * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures
@@ -65,21 +65,21 @@ import java.util.concurrent.Executors;
  */
 public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
 
-	/** The dispatcher thread pool for these services */
+	/** The dispatcher thread pool for these services. */
 	private final ExecutorService dispatcher;
 
-	/** The embedded leader election service used by JobManagers to find the resource manager */
+	/** The embedded leader election service used by JobManagers to find the resource manager. */
 	private final SingleLeaderElectionService resourceManagerLeaderElectionService;
 
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates new YarnIntraNonHaMasterServices for the given Flink and YARN configuration.
-	 * 
-	 * This constructor initializes access to the HDFS to store recovery data, and creates the
+	 *
+	 * <p>This constructor initializes access to the HDFS to store recovery data, and creates the
 	 * embedded leader election services through which ResourceManager and JobManager find and
 	 * confirm each other.
-	 * 
+	 *
 	 * @param config     The Flink configuration of this component / process.
 	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
index 9d05bbe..ae8f05b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -35,7 +35,7 @@ import java.io.IOException;
 /**
  * These YarnHighAvailabilityServices are for use by the TaskManager in setups,
  * where there is one ResourceManager that is statically configured in the Flink configuration.
- * 
+ *
  * <h3>Handled failure types</h3>
  * <ul>
  *     <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li>
@@ -52,7 +52,7 @@ import java.io.IOException;
  * <p>Internally, these services put their recovery data into YARN's working directory,
  * except for checkpoints, which are in the configured checkpoint directory. That way,
  * checkpoints can be resumed with a new job/application, even if the complete YARN application
- * is killed and cleaned up. 
+ * is killed and cleaned up.
  *
  * <p>A typical YARN setup that uses these HA services first starts the ResourceManager
  * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which
@@ -63,7 +63,7 @@ import java.io.IOException;
  */
 public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServices {
 
-	/** The RPC URL under which the single ResourceManager can be reached while available */ 
+	/** The RPC URL under which the single ResourceManager can be reached while available. */
 	private final String resourceManagerRpcUrl;
 
 	// ------------------------------------------------------------------------
@@ -72,7 +72,7 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
 	 * Creates new YarnPreConfiguredMasterHaServices for the given Flink and YARN configuration.
 	 * This constructor parses the ResourceManager address from the Flink configuration and sets
 	 * up the HDFS access to store recovery data in the YARN application's working directory.
-	 * 
+	 *
 	 * @param config     The Flink configuration of this component / process.
 	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
 	 *
@@ -97,7 +97,7 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
 			final int rmPort = config.getInteger(YarnConfigOptions.APP_MASTER_RPC_PORT);
 
 			if (rmHost == null) {
-				throw new IllegalConfigurationException("Config parameter '" + 
+				throw new IllegalConfigurationException("Config parameter '" +
 						YarnConfigOptions.APP_MASTER_RPC_ADDRESS.key() + "' is missing.");
 			}
 			if (rmPort < 0) {
@@ -105,7 +105,7 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
 						YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' is missing.");
 			}
 			if (rmPort <= 0 || rmPort >= 65536) {
-				throw new IllegalConfigurationException("Invalid value for '" + 
+				throw new IllegalConfigurationException("Invalid value for '" +
 						YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' - port must be in [1, 65535]");
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java
index 2648e44..f11063f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn.messages;
 
 import org.apache.flink.yarn.YarnFlinkResourceManager;
+
 import org.apache.hadoop.yarn.api.records.Container;
 
 import java.util.List;
@@ -26,17 +27,17 @@ import java.util.List;
 /**
  * Message sent by the callback handler to the {@link YarnFlinkResourceManager}
  * to notify it that a set of new containers is available.
- * 
- * NOTE: This message is not serializable, because the Container object is not serializable.
+ *
+ * <p>NOTE: This message is not serializable, because the Container object is not serializable.
  */
 public class ContainersAllocated {
-	
+
 	private final List<Container> containers;
-	
+
 	public ContainersAllocated(List<Container> containers) {
 		this.containers = containers;
 	}
-	
+
 	public List<Container> containers() {
 		return containers;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java
index 65bafbc..5b43835 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.yarn.messages;
 
-
 import org.apache.flink.yarn.YarnFlinkResourceManager;
+
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 
 import java.util.List;
@@ -27,17 +27,17 @@ import java.util.List;
 /**
  * Message sent by the callback handler to the {@link YarnFlinkResourceManager}
  * to notify it that a set of new containers is complete.
- * 
- * NOTE: This message is not serializable, because the ContainerStatus object is not serializable.
+ *
+ * <p>NOTE: This message is not serializable, because the ContainerStatus object is not serializable.
  */
 public class ContainersComplete {
-	
+
 	private final List<ContainerStatus> containers;
-	
+
 	public ContainersComplete(List<ContainerStatus> containers) {
 		this.containers = containers;
 	}
-	
+
 	public List<ContainerStatus> containers() {
 		return containers;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/resources/log4j.properties b/flink-yarn/src/main/resources/log4j.properties
index 749796f..b2ad0d3 100644
--- a/flink-yarn/src/main/resources/log4j.properties
+++ b/flink-yarn/src/main/resources/log4j.properties
@@ -16,7 +16,6 @@
 # limitations under the License.
 ################################################################################
 
-
 # Convenience file for local debugging of the JobManager/TaskManager.
 log4j.rootLogger=INFO, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 35d5f56..9ac96a3 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -25,11 +25,11 @@ import grizzled.slf4j.Logger
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
-import org.apache.flink.runtime.{LeaderSessionMessageFilter, FlinkActor, LogMessages}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.yarn.YarnMessages._
+
 import scala.collection.mutable
 import scala.concurrent.duration._
-
 import scala.language.postfixOps
 
 /** Actor which is responsible to repeatedly poll the Yarn cluster status from the ResourceManager.
@@ -187,7 +187,7 @@ class ApplicationClient(
 
     // locally forward messages
     case LocalGetYarnMessage =>
-      if(messagesQueue.nonEmpty) {
+      if (messagesQueue.nonEmpty) {
         sender() ! decorateMessage(Option(messagesQueue.dequeue()))
       } else {
         sender() ! decorateMessage(None)

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index e094bb7..d78b390 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-
 /** JobManager actor for execution on Yarn. It enriches the [[JobManager]] with additional messages
   * to start/administer/stop the Yarn session.
   *

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
index ada2631..1636e09 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
@@ -18,13 +18,12 @@
 
 package org.apache.flink.yarn
 
-import java.util.{Date, UUID, List => JavaList}
+import java.util.{UUID, List => JavaList}
 
-import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.clusterframework.ApplicationStatus
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.{Container, ContainerStatus, FinalApplicationStatus}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerStatus}
 
 import scala.concurrent.duration.{Deadline, FiniteDuration}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index b7f4c9a..e37ff6f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -23,9 +23,9 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
index f03c604..d283c3b 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -26,6 +25,8 @@ import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
 import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+
+import akka.actor.ActorRef;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
@@ -35,6 +36,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import java.util.Comparator;
 import java.util.PriorityQueue;
 
+/**
+ * A test extension to the {@link YarnFlinkResourceManager} that can handle additional test messages.
+ */
 public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
 
 	private final PriorityQueue<Tuple2<Integer, ActorRef>> waitingQueue = new PriorityQueue<>(32, new Comparator<Tuple2<Integer, ActorRef>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index a09c5b2..a5ec176 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -36,6 +31,12 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
 import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -53,11 +54,6 @@ import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -66,11 +62,20 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link Utils}.
+ */
 public class UtilsTest extends TestLogger {
 
 	private static ActorSystem system;
@@ -99,7 +104,7 @@ public class UtilsTest extends TestLogger {
 			String applicationMasterHostName = "localhost";
 			String webInterfaceURL = "foobar";
 			ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
-				1l, 1l, 1l, 1, new HashMap<String, String>());
+				1L, 1L, 1L, 1, new HashMap<String, String>());
 			ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
 			int yarnHeartbeatIntervalMillis = 1000;
 			int maxFailedContainers = 10;
@@ -203,7 +208,7 @@ public class UtilsTest extends TestLogger {
 					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
 				}
 
-				Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft());
+				Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, deadline.timeLeft());
 
 				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
index 4884dd0..19a0352 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.yarn;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.OperatingSystem;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Assume;
@@ -38,7 +39,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.Map;
 
-import static org.apache.flink.yarn.YarnConfigKeys.*;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyInt;
@@ -46,6 +52,9 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for the {@link YarnApplicationMasterRunner}.
+ */
 public class YarnApplicationMasterRunnerTest {
 	private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class);
 
@@ -81,7 +90,7 @@ public class YarnApplicationMasterRunnerTest {
 			}
 		}).when(yarnConf).getStrings(anyString(), Mockito.<String> anyVararg());
 
-		Map<String, String> env = ImmutableMap. <String, String> builder()
+		Map<String, String> env = ImmutableMap.<String, String> builder()
 			.put(ENV_APP_ID, "foo")
 			.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath())
 			.put(ENV_CLIENT_SHIP_FILES, "")

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index a7204da..9326723 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
@@ -22,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.junit.Before;
@@ -32,9 +34,12 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link YarnClusterDescriptor}.
+ */
 public class YarnClusterDescriptorTest {
 
 	@Rule
@@ -140,7 +145,7 @@ public class YarnClusterDescriptorTest {
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + " " + krb5 +// jvmOpts
+				" " + " " + krb5 + // jvmOpts
 				" " + // logging
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
@@ -159,7 +164,7 @@ public class YarnClusterDescriptorTest {
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + " " + krb5 +// jvmOpts
+				" " + " " + krb5 + // jvmOpts
 				" " + logfile + " " + logback +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
@@ -178,7 +183,7 @@ public class YarnClusterDescriptorTest {
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + " " + krb5 +// jvmOpts
+				" " + " " + krb5 + // jvmOpts
 				" " + logfile + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
@@ -197,7 +202,7 @@ public class YarnClusterDescriptorTest {
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + " " + krb5 +// jvmOpts
+				" " + " " + krb5 + // jvmOpts
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
@@ -210,16 +215,16 @@ public class YarnClusterDescriptorTest {
 			java + " " + jvmmem +
 				" " + jvmOpts +
 				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " "+ redirects,
+				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(true, true, false)
 				.getCommands().get(0));
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + jvmOpts + " " + krb5 +// jvmOpts
+				" " + jvmOpts + " " + krb5 + // jvmOpts
 				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " "+ redirects,
+				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(true, true, true)
 				.getCommands().get(0));
@@ -230,16 +235,16 @@ public class YarnClusterDescriptorTest {
 			java + " " + jvmmem +
 				" " + jvmOpts + " " + jmJvmOpts +
 				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " "+ redirects,
+				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(true, true, false)
 				.getCommands().get(0));
 
 		assertEquals(
 			java + " " + jvmmem +
-				" " + jvmOpts + " " + jmJvmOpts + " " + krb5 +// jvmOpts
+				" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
 				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " "+ redirects,
+				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(true, true, true)
 				.getCommands().get(0));

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
index b4d2ba8..bdc7863 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
@@ -26,10 +26,9 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.StringUtils;
-
 import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;
@@ -37,7 +36,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -53,6 +51,9 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for YarnIntraNonHaMasterServices.
+ */
 public class YarnIntraNonHaMasterServicesTest extends TestLogger {
 
 	private static final Random RND = new Random();
@@ -60,9 +61,9 @@ public class YarnIntraNonHaMasterServicesTest extends TestLogger {
 	@ClassRule
 	public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
 
-	private static MiniDFSCluster HDFS_CLUSTER;
+	private static MiniDFSCluster hdfsCluster;
 
-	private static Path HDFS_ROOT_PATH;
+	private static Path hdfsRootPath;
 
 	private org.apache.hadoop.conf.Configuration hadoopConfig;
 
@@ -80,23 +81,23 @@ public class YarnIntraNonHaMasterServicesTest extends TestLogger {
 		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
 
 		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-		HDFS_CLUSTER = builder.build();
-		HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+		hdfsCluster = builder.build();
+		hdfsRootPath = new Path(hdfsCluster.getURI());
 	}
 
 	@AfterClass
 	public static void destroyHDFS() {
-		if (HDFS_CLUSTER != null) {
-			HDFS_CLUSTER.shutdown();
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
 		}
-		HDFS_CLUSTER = null;
-		HDFS_ROOT_PATH = null;
+		hdfsCluster = null;
+		hdfsRootPath = null;
 	}
 
 	@Before
 	public void initConfig() {
 		hadoopConfig = new org.apache.hadoop.conf.Configuration();
-		hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString());
+		hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString());
 	}
 
 	// ------------------------------------------------------------------------


Mime
View raw message