Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D8E3E200CA5 for ; Fri, 26 May 2017 21:16:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D7987160BC8; Fri, 26 May 2017 19:16:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B7E68160BE2 for ; Fri, 26 May 2017 21:16:13 +0200 (CEST) Received: (qmail 70938 invoked by uid 500); 26 May 2017 19:16:12 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 69984 invoked by uid 99); 26 May 2017 19:16:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 May 2017 19:16:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2CE73F2181; Fri, 26 May 2017 19:16:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chesnay@apache.org To: commits@flink.apache.org Date: Fri, 26 May 2017 19:16:23 -0000 Message-Id: <8a5ef8569fe243ef9546220e358e5445@git.apache.org> In-Reply-To: <1b42c15ef80a4ff6b04aea1beb9e93e1@git.apache.org> References: <1b42c15ef80a4ff6b04aea1beb9e93e1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/15] flink git commit: [FLINK-6701] Activate strict checkstyle for flink-yarn archived-at: Fri, 26 May 2017 19:16:16 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 63e6a4c..6099d18 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -37,75 +37,74 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; + import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; -import org.apache.flink.util.ExceptionUtils; -import java.util.Map; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.FiniteDuration; + /** * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. */ public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { - protected final Logger LOG = LoggerFactory.getLogger(getClass()); - /** The process environment variables */ - private final Map ENV; + /** The process environment variables. */ + private final Map env; /** The default registration timeout for task executor in seconds. */ - private final static int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300; + private static final int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300; - /** The heartbeat interval while the resource master is waiting for containers */ + /** The heartbeat interval while the resource master is waiting for containers. */ private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; - /** The default heartbeat interval during regular operation */ + /** The default heartbeat interval during regular operation. */ private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; - /** The default memory of task executor to allocate (in MB) */ + /** The default memory of task executor to allocate (in MB). */ private static final int DEFAULT_TSK_EXECUTOR_MEMORY_SIZE = 1024; /** Environment variable name of the final container id used by the YarnResourceManager. * Container ID generation may vary across Hadoop versions. */ - final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID"; - + static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID"; + /** Environment variable name of the hostname given by the YARN. * In task executor we use the hostnames given by YARN consistently throughout akka */ - final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID"; + static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID"; - /** Default heartbeat interval between this resource manager and the YARN ResourceManager */ + /** Default heartbeat interval between this resource manager and the YARN ResourceManager. */ private final int yarnHeartbeatIntervalMillis; private final Configuration flinkConfig; private final YarnConfiguration yarnConfig; - /** Client to communicate with the Resource Manager (YARN's master) */ + /** Client to communicate with the Resource Manager (YARN's master). */ private AMRMClientAsync resourceManagerClient; - /** Client to communicate with the Node manager and launch TaskExecutor processes */ + /** Client to communicate with the Node manager and launch TaskExecutor processes. */ private NMClient nodeManagerClient; - /** The number of containers requested, but not yet granted */ + /** The number of containers requested, but not yet granted. */ private int numPendingContainerRequests; - final private Map resourcePriorities = new HashMap<>(); + private final Map resourcePriorities = new HashMap<>(); public YarnResourceManager( RpcService rpcService, @@ -133,7 +132,7 @@ public class YarnResourceManager extends ResourceManager implements fatalErrorHandler); this.flinkConfig = flinkConfig; this.yarnConfig = new YarnConfiguration(); - this.ENV = env; + this.env = env; final int yarnHeartbeatIntervalMS = flinkConfig.getInteger( ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000; @@ -161,7 +160,7 @@ public class YarnResourceManager extends ResourceManager implements //TODO: the third paramter should be the webmonitor address resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress()); } catch (Exception e) { - LOG.info("registerApplicationMaster fail", e); + log.info("registerApplicationMaster fail", e); } // create the client to communicate with the node managers @@ -204,11 +203,11 @@ public class YarnResourceManager extends ResourceManager implements // first, de-register from YARN FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus); - LOG.info("Unregistering application from the YARN Resource Manager"); + log.info("Unregistering application from the YARN Resource Manager"); try { resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, ""); } catch (Throwable t) { - LOG.error("Could not unregister the application master.", t); + log.error("Could not unregister the application master.", t); } } @@ -217,8 +216,8 @@ public class YarnResourceManager extends ResourceManager implements // Priority for worker containers - priorities are intra-application //TODO: set priority according to the resource allocated Priority priority = Priority.newInstance(generatePriority(resourceProfile)); - int mem = resourceProfile.getMemoryInMB() < 0 ? DEFAULT_TSK_EXECUTOR_MEMORY_SIZE : (int)resourceProfile.getMemoryInMB(); - int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores(); + int mem = resourceProfile.getMemoryInMB() < 0 ? DEFAULT_TSK_EXECUTOR_MEMORY_SIZE : (int) resourceProfile.getMemoryInMB(); + int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int) resourceProfile.getCpuCores(); Resource capability = Resource.newInstance(mem, vcore); requestYarnContainer(capability, priority); } @@ -254,7 +253,7 @@ public class YarnResourceManager extends ResourceManager implements public void onContainersAllocated(List containers) { for (Container container : containers) { numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - LOG.info("Received new container: {} - Remaining pending container requests: {}", + log.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); try { /** Context information used to start a TaskExecutor Java process */ @@ -264,7 +263,7 @@ public class YarnResourceManager extends ResourceManager implements } catch (Throwable t) { // failed to launch the container, will release the failed one and ask for a new one - LOG.error("Could not start TaskManager in container {},", container, t); + log.error("Could not start TaskManager in container {},", container, t); resourceManagerClient.releaseAssignedContainer(container.getId()); requestYarnContainer(container.getResource(), container.getPriority()); } @@ -279,7 +278,7 @@ public class YarnResourceManager extends ResourceManager implements try { shutDown(); } catch (Exception e) { - LOG.warn("Fail to shutdown the YARN resource manager.", e); + log.warn("Fail to shutdown the YARN resource manager.", e); } } @@ -317,7 +316,7 @@ public class YarnResourceManager extends ResourceManager implements } } - // parse the host and port from akka address, + // parse the host and port from akka address, // the akka address is like akka.tcp://flink@100.81.153.180:49712/user/$a private static Tuple2 parseHostPort(String address) { String[] hostPort = address.split("@")[1].split(":"); @@ -333,35 +332,35 @@ public class YarnResourceManager extends ResourceManager implements resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); numPendingContainerRequests++; - LOG.info("Requesting new TaskManager container pending requests: {}", + log.info("Requesting new TaskManager container pending requests: {}", numPendingContainerRequests); } private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host) throws Exception { // init the ContainerLaunchContext - final String currDir = ENV.get(ApplicationConstants.Environment.PWD.key()); + final String currDir = env.get(ApplicationConstants.Environment.PWD.key()); final ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 1); - LOG.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " + + log.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " + "JVM direct memory limit {} MB", containerId, taskManagerParameters.taskManagerTotalMemoryMB(), taskManagerParameters.taskManagerHeapSizeMB(), taskManagerParameters.taskManagerDirectMemoryLimitMB()); - int timeout = flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, + int timeout = flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, DEFAULT_TASK_MANAGER_REGISTRATION_DURATION); FiniteDuration teRegistrationTimeout = new FiniteDuration(timeout, TimeUnit.SECONDS); final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( flinkConfig, "", 0, 1, teRegistrationTimeout); - LOG.debug("TaskManager configuration: {}", taskManagerConfig); + log.debug("TaskManager configuration: {}", taskManagerConfig); ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( - flinkConfig, yarnConfig, ENV, + flinkConfig, yarnConfig, env, taskManagerParameters, taskManagerConfig, - currDir, YarnTaskExecutorRunner.class, LOG); + currDir, YarnTaskExecutorRunner.class, log); // set a special environment variable to uniquely identify this container taskExecutorLaunchContext.getEnvironment() @@ -373,7 +372,6 @@ public class YarnResourceManager extends ResourceManager implements - /** * Generate priority by given resource profile. * Priority is only used for distinguishing request of different resource. http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java index 2372cbc..62729a4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java @@ -18,11 +18,11 @@ package org.apache.flink.yarn; -import akka.actor.ActorRef; import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; import org.apache.flink.yarn.messages.ContainersAllocated; import org.apache.flink.yarn.messages.ContainersComplete; +import akka.actor.ActorRef; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -37,16 +37,16 @@ import java.util.List; */ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.CallbackHandler { - /** The yarn master to which we report the callbacks */ + /** The yarn master to which we report the callbacks. */ private ActorRef yarnFrameworkMaster; - /** The progress we report */ + /** The progress we report. */ private float currentProgress; public YarnResourceManagerCallbackHandler() { this(null); } - + public YarnResourceManagerCallbackHandler(ActorRef yarnFrameworkMaster) { this.yarnFrameworkMaster = yarnFrameworkMaster; } http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index 398a5eb..2ed4c1d 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.Preconditions; + import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -52,13 +53,12 @@ import java.util.concurrent.Callable; */ public class YarnTaskExecutorRunner { - /** Logger */ protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class); - /** The process environment variables */ + /** The process environment variables. */ private static final Map ENV = System.getenv(); - /** The exit code returned if the initialization of the yarn task executor runner failed */ + /** The exit code returned if the initialization of the yarn task executor runner failed. */ private static final int INIT_ERROR_EXIT_CODE = 31; private MetricRegistry metricRegistry; @@ -131,7 +131,7 @@ public class YarnTaskExecutorRunner { configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); String keytabPath = null; - if(remoteKeytabPath != null) { + if (remoteKeytabPath != null) { File f = new File(currDir, Utils.KEYTAB_FILE_NAME); keytabPath = f.getAbsolutePath(); LOG.info("keytab path: {}", keytabPath); @@ -252,7 +252,6 @@ public class YarnTaskExecutorRunner { // Utilities // ------------------------------------------------------------------------ - protected void shutdown() { if (taskExecutorRpcService != null) { try { http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index 047a1fa..265c5a6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -18,11 +18,6 @@ package org.apache.flink.yarn; -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.Callable; - import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -31,17 +26,21 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; - import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.Preconditions; + import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Callable; + /** * The entry point for running a TaskManager in a YARN container. */ @@ -95,7 +94,7 @@ public class YarnTaskManagerRunner { configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); String localKeytabPath = null; - if(remoteKeytabPath != null) { + if (remoteKeytabPath != null) { File f = new File(currDir, Utils.KEYTAB_FILE_NAME); localKeytabPath = f.getAbsolutePath(); LOG.info("localKeytabPath: {}", localKeytabPath); @@ -104,7 +103,7 @@ public class YarnTaskManagerRunner { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", - currentUser.getShortUserName(), yarnClientUsername ); + currentUser.getShortUserName(), yarnClientUsername); // Infer the resource identifier from the environment variable String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID)); @@ -153,7 +152,7 @@ public class YarnTaskManagerRunner { return null; } }); - } catch(Exception e) { + } catch (Exception e) { LOG.error("Exception occurred while launching Task Manager", e); throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java index 6ce8d17..aaa9bac 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java @@ -15,19 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.yarn.cli; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.YarnClusterClientV2; import org.apache.flink.yarn.YarnClusterDescriptorV2; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,27 +50,27 @@ import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; public class FlinkYarnCLI implements CustomCommandLine { private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCLI.class); - /** The id for the CommandLine interface */ + /** The id for the CommandLine interface. */ private static final String ID = "yarn"; private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split() //------------------------------------ Command Line argument options ------------------------- // the prefix transformation is used by the CliFrontend static constructor. - private final Option QUEUE; - private final Option SHIP_PATH; - private final Option FLINK_JAR; - private final Option JM_MEMORY; - private final Option DETACHED; - private final Option ZOOKEEPER_NAMESPACE; + private final Option queue; + private final Option shipPath; + private final Option flinkJar; + private final Option jmMemory; + private final Option detached; + private final Option zookeeperNamespace; - private final Options ALL_OPTIONS; + private final Options allOptions; /** * Dynamic properties allow the user to specify additional configuration values with -D, such as - * -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624 + * -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624. */ - private final Option DYNAMIC_PROPERTIES; + private final Option dynamicProperties; //------------------------------------ Internal fields ------------------------- // use detach mode as default @@ -76,22 +78,22 @@ public class FlinkYarnCLI implements CustomCommandLine { public FlinkYarnCLI(String shortPrefix, String longPrefix) { - QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); - SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); - FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); - JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]"); - DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties"); - DETACHED = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached"); - ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); - - ALL_OPTIONS = new Options(); - ALL_OPTIONS.addOption(FLINK_JAR); - ALL_OPTIONS.addOption(JM_MEMORY); - ALL_OPTIONS.addOption(QUEUE); - ALL_OPTIONS.addOption(SHIP_PATH); - ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES); - ALL_OPTIONS.addOption(DETACHED); - ALL_OPTIONS.addOption(ZOOKEEPER_NAMESPACE); + queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); + shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); + flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); + jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]"); + dynamicProperties = new Option(shortPrefix + "D", true, "Dynamic properties"); + detached = new Option(shortPrefix + "a", longPrefix + "attached", false, "Start attached"); + zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); + + allOptions = new Options(); + allOptions.addOption(flinkJar); + allOptions.addOption(jmMemory); + allOptions.addOption(queue); + allOptions.addOption(shipPath); + allOptions.addOption(dynamicProperties); + allOptions.addOption(detached); + allOptions.addOption(zookeeperNamespace); } public YarnClusterDescriptorV2 createDescriptor(String defaultApplicationName, CommandLine cmd) { @@ -100,8 +102,8 @@ public class FlinkYarnCLI implements CustomCommandLine { // Jar Path Path localJarPath; - if (cmd.hasOption(FLINK_JAR.getOpt())) { - String userPath = cmd.getOptionValue(FLINK_JAR.getOpt()); + if (cmd.hasOption(flinkJar.getOpt())) { + String userPath = cmd.getOptionValue(flinkJar.getOpt()); if (!userPath.startsWith("file://")) { userPath = "file://" + userPath; } @@ -117,7 +119,7 @@ public class FlinkYarnCLI implements CustomCommandLine { localJarPath = new Path(new File(decodedPath).toURI()); } catch (UnsupportedEncodingException e) { throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath + - " Please supply a path manually via the -" + FLINK_JAR.getOpt() + " option."); + " Please supply a path manually via the -" + flinkJar.getOpt() + " option."); } } @@ -125,8 +127,8 @@ public class FlinkYarnCLI implements CustomCommandLine { List shipFiles = new ArrayList<>(); // path to directory to ship - if (cmd.hasOption(SHIP_PATH.getOpt())) { - String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); + if (cmd.hasOption(shipPath.getOpt())) { + String shipPath = cmd.getOptionValue(this.shipPath.getOpt()); File shipDir = new File(shipPath); if (shipDir.isDirectory()) { shipFiles.add(shipDir); @@ -138,36 +140,36 @@ public class FlinkYarnCLI implements CustomCommandLine { yarnClusterDescriptor.addShipFiles(shipFiles); // queue - if (cmd.hasOption(QUEUE.getOpt())) { - yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt())); + if (cmd.hasOption(queue.getOpt())) { + yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt())); } // JobManager Memory - if (cmd.hasOption(JM_MEMORY.getOpt())) { - int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); + if (cmd.hasOption(jmMemory.getOpt())) { + int jmMemory = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); } String[] dynamicProperties = null; - if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { - dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt()); + if (cmd.hasOption(this.dynamicProperties.getOpt())) { + dynamicProperties = cmd.getOptionValues(this.dynamicProperties.getOpt()); } String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR); yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); - if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) { + if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) { // TODO: not support non detach mode now. //this.detachedMode = false; } yarnClusterDescriptor.setDetachedMode(this.detachedMode); - if(defaultApplicationName != null) { + if (defaultApplicationName != null) { yarnClusterDescriptor.setName(defaultApplicationName); } - if (cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt())) { - String zookeeperNamespace = cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt()); + if (cmd.hasOption(zookeeperNamespace.getOpt())) { + String zookeeperNamespace = cmd.getOptionValue(this.zookeeperNamespace.getOpt()); yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace); } @@ -201,7 +203,7 @@ public class FlinkYarnCLI implements CustomCommandLine { @Override public void addRunOptions(Options baseOptions) { - for (Object option : ALL_OPTIONS.getOptions()) { + for (Object option : allOptions.getOptions()) { baseOptions.addOption((Option) option); } } @@ -233,9 +235,6 @@ public class FlinkYarnCLI implements CustomCommandLine { return new YarnClusterClientV2(yarnClusterDescriptor, config); } - /** - * Utility method - */ private void logAndSysout(String message) { LOG.info(message); System.out.println(message); http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 53253d6..f15314a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -15,15 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.yarn.cli; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; @@ -38,6 +32,14 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.yarn.YarnClusterDescriptor; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; @@ -79,7 +81,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine private static final int CLIENT_POLLING_INTERVALL = 3; - /** The id for the CommandLine interface */ + /** The id for the CommandLine interface. */ private static final String ID = "yarn-cluster"; // YARN-session related constants @@ -92,19 +94,19 @@ public class FlinkYarnSessionCli implements CustomCommandLine //------------------------------------ Command Line argument options ------------------------- // the prefix transformation is used by the CliFrontend static constructor. - private final Option QUERY; + private final Option query; // --- or --- - private final Option APPLICATION_ID; + private final Option applicationId; // --- or --- - private final Option QUEUE; - private final Option SHIP_PATH; - private final Option FLINK_JAR; - private final Option JM_MEMORY; - private final Option TM_MEMORY; - private final Option CONTAINER; - private final Option SLOTS; - private final Option DETACHED; - private final Option ZOOKEEPER_NAMESPACE; + private final Option queue; + private final Option shipPath; + private final Option flinkJar; + private final Option jmMemory; + private final Option tmMemory; + private final Option container; + private final Option slots; + private final Option detached; + private final Option zookeeperNamespace; /** * @deprecated Streaming mode has been deprecated without replacement. Set the @@ -112,16 +114,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine * key to true to get the previous batch mode behaviour. */ @Deprecated - private final Option STREAMING; - private final Option NAME; + private final Option streaming; + private final Option name; - private final Options ALL_OPTIONS; + private final Options allOptions; /** * Dynamic properties allow the user to specify additional configuration values with -D, such as - * -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624 + * -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624. */ - private final Option DYNAMIC_PROPERTIES; + private final Option dynamicproperties; private final boolean acceptInteractiveInput; @@ -136,41 +138,40 @@ public class FlinkYarnSessionCli implements CustomCommandLine public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) { this.acceptInteractiveInput = acceptInteractiveInput; - QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); - APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session"); - QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); - SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); - FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); - JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]"); - TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]"); - CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)"); - SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager"); - DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties"); - DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); - STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); - NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); - ZOOKEEPER_NAMESPACE = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); - - ALL_OPTIONS = new Options(); - ALL_OPTIONS.addOption(FLINK_JAR); - ALL_OPTIONS.addOption(JM_MEMORY); - ALL_OPTIONS.addOption(TM_MEMORY); - ALL_OPTIONS.addOption(CONTAINER); - ALL_OPTIONS.addOption(QUEUE); - ALL_OPTIONS.addOption(QUERY); - ALL_OPTIONS.addOption(SHIP_PATH); - ALL_OPTIONS.addOption(SLOTS); - ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES); - ALL_OPTIONS.addOption(DETACHED); - ALL_OPTIONS.addOption(STREAMING); - ALL_OPTIONS.addOption(NAME); - ALL_OPTIONS.addOption(APPLICATION_ID); - ALL_OPTIONS.addOption(ZOOKEEPER_NAMESPACE); + query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); + applicationId = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session"); + queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); + shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); + flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); + jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]"); + tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]"); + container = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)"); + slots = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager"); + dynamicproperties = new Option(shortPrefix + "D", true, "Dynamic properties"); + detached = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); + streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); + name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); + zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); + + allOptions = new Options(); + allOptions.addOption(flinkJar); + allOptions.addOption(jmMemory); + allOptions.addOption(tmMemory); + allOptions.addOption(container); + allOptions.addOption(queue); + allOptions.addOption(query); + allOptions.addOption(shipPath); + allOptions.addOption(slots); + allOptions.addOption(dynamicproperties); + allOptions.addOption(detached); + allOptions.addOption(streaming); + allOptions.addOption(name); + allOptions.addOption(applicationId); + allOptions.addOption(zookeeperNamespace); } - /** - * Tries to load a Flink Yarn properties file and returns the Yarn application id if successful + * Tries to load a Flink Yarn properties file and returns the Yarn application id if successful. * @param cmdLine The command-line parameters * @param flinkConfiguration The flink configuration * @return Yarn application id or null if none could be retrieved @@ -184,8 +185,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine } for (Option option : cmdLine.getOptions()) { - if (ALL_OPTIONS.hasOption(option.getOpt())) { - if (!option.getOpt().equals(DETACHED.getOpt())) { + if (allOptions.hasOption(option.getOpt())) { + if (!option.getOpt().equals(detached.getOpt())) { // don't resume from properties file if yarn options have been specified return null; } @@ -257,17 +258,17 @@ public class FlinkYarnSessionCli implements CustomCommandLine AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(); - if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option! - LOG.error("Missing required argument {}", CONTAINER.getOpt()); + if (!cmd.hasOption(container.getOpt())) { // number of containers is required option! + LOG.error("Missing required argument {}", container.getOpt()); printUsage(); - throw new IllegalArgumentException("Missing required argument " + CONTAINER.getOpt()); + throw new IllegalArgumentException("Missing required argument " + container.getOpt()); } - yarnClusterDescriptor.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()))); + yarnClusterDescriptor.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(container.getOpt()))); // Jar Path Path localJarPath; - if (cmd.hasOption(FLINK_JAR.getOpt())) { - String userPath = cmd.getOptionValue(FLINK_JAR.getOpt()); + if (cmd.hasOption(flinkJar.getOpt())) { + String userPath = cmd.getOptionValue(flinkJar.getOpt()); if (!userPath.startsWith("file://")) { userPath = "file://" + userPath; } @@ -283,7 +284,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine localJarPath = new Path(new File(decodedPath).toURI()); } catch (UnsupportedEncodingException e) { throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath + - " Please supply a path manually via the -" + FLINK_JAR.getOpt() + " option."); + " Please supply a path manually via the -" + flinkJar.getOpt() + " option."); } } @@ -291,8 +292,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine List shipFiles = new ArrayList<>(); // path to directory to ship - if (cmd.hasOption(SHIP_PATH.getOpt())) { - String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); + if (cmd.hasOption(shipPath.getOpt())) { + String shipPath = cmd.getOptionValue(this.shipPath.getOpt()); File shipDir = new File(shipPath); if (shipDir.isDirectory()) { shipFiles.add(shipDir); @@ -304,51 +305,51 @@ public class FlinkYarnSessionCli implements CustomCommandLine yarnClusterDescriptor.addShipFiles(shipFiles); // queue - if (cmd.hasOption(QUEUE.getOpt())) { - yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt())); + if (cmd.hasOption(queue.getOpt())) { + yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt())); } // JobManager Memory - if (cmd.hasOption(JM_MEMORY.getOpt())) { - int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); + if (cmd.hasOption(jmMemory.getOpt())) { + int jmMemory = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); } // Task Managers memory - if (cmd.hasOption(TM_MEMORY.getOpt())) { - int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt())); + if (cmd.hasOption(tmMemory.getOpt())) { + int tmMemory = Integer.valueOf(cmd.getOptionValue(this.tmMemory.getOpt())); yarnClusterDescriptor.setTaskManagerMemory(tmMemory); } - if (cmd.hasOption(SLOTS.getOpt())) { - int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); + if (cmd.hasOption(slots.getOpt())) { + int slots = Integer.valueOf(cmd.getOptionValue(this.slots.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); } String[] dynamicProperties = null; - if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { - dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt()); + if (cmd.hasOption(dynamicproperties.getOpt())) { + dynamicProperties = cmd.getOptionValues(dynamicproperties.getOpt()); } String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR); yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); - if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) { + if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) { this.detachedMode = true; yarnClusterDescriptor.setDetachedMode(true); } - if(cmd.hasOption(NAME.getOpt())) { - yarnClusterDescriptor.setName(cmd.getOptionValue(NAME.getOpt())); + if (cmd.hasOption(name.getOpt())) { + yarnClusterDescriptor.setName(cmd.getOptionValue(name.getOpt())); } else { // set the default application name, if none is specified - if(defaultApplicationName != null) { + if (defaultApplicationName != null) { yarnClusterDescriptor.setName(defaultApplicationName); } } - if (cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt())) { - String zookeeperNamespace = cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt()); + if (cmd.hasOption(zookeeperNamespace.getOpt())) { + String zookeeperNamespace = cmd.getOptionValue(this.zookeeperNamespace.getOpt()); yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace); } @@ -368,7 +369,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine String message = "The YARN cluster has " + maxSlots + " slots available, " + "but the user requested a parallelism of " + userParallelism + " on YARN. " + "Each of the " + yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."; + "will get " + slotsPerTM + " slots."; logAndSysout(message); yarnClusterDescriptor.setTaskManagerSlots(slotsPerTM); } @@ -383,7 +384,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine formatter.setLeftPadding(5); formatter.setSyntaxPrefix(" Required"); Options req = new Options(); - req.addOption(CONTAINER); + req.addOption(container); formatter.printHelp(" ", req); formatter.setSyntaxPrefix(" Optional"); @@ -403,7 +404,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine } public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean readConsoleInput) { - final String HELP = "Available commands:\n" + + final String help = "Available commands:\n" + "help - show these commands\n" + "stop - stop the YARN session"; int numTaskmanagers = 0; @@ -443,8 +444,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine // wait until CLIENT_POLLING_INTERVAL is over or the user entered something. long startTime = System.currentTimeMillis(); while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000 - && (!readConsoleInput || !in.ready())) - { + && (!readConsoleInput || !in.ready())) { Thread.sleep(200); } //------------- handle interactive command by user. ---------------------- @@ -458,10 +458,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine break label; case "help": - System.err.println(HELP); + System.err.println(help); break; default: - System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP); + System.err.println("Unknown command '" + command + "'. Showing help: \n" + help); break; } } @@ -471,7 +471,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine break; } } - } catch(Exception e) { + } catch (Exception e) { LOG.warn("Exception while running the interactive command line interface", e); } } @@ -493,7 +493,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine public boolean isActive(CommandLine commandLine, Configuration configuration) { String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); - boolean yarnAppId = commandLine.hasOption(APPLICATION_ID.getOpt()); + boolean yarnAppId = commandLine.hasOption(applicationId.getOpt()); return yarnJobManager || yarnAppId || loadYarnPropertiesFile(commandLine, configuration) != null; } @@ -504,14 +504,14 @@ public class FlinkYarnSessionCli implements CustomCommandLine @Override public void addRunOptions(Options baseOptions) { - for (Object option : ALL_OPTIONS.getOptions()) { + for (Object option : allOptions.getOptions()) { baseOptions.addOption((Option) option); } } @Override public void addGeneralOptions(Options baseOptions) { - baseOptions.addOption(APPLICATION_ID); + baseOptions.addOption(applicationId); } @Override @@ -520,13 +520,13 @@ public class FlinkYarnSessionCli implements CustomCommandLine Configuration config) throws UnsupportedOperationException { // first check for an application id, then try to load from yarn properties - String applicationID = cmdLine.hasOption(APPLICATION_ID.getOpt()) ? - cmdLine.getOptionValue(APPLICATION_ID.getOpt()) + String applicationID = cmdLine.hasOption(applicationId.getOpt()) ? + cmdLine.getOptionValue(applicationId.getOpt()) : loadYarnPropertiesFile(cmdLine, config); - if(null != applicationID) { - String zkNamespace = cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ? - cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt()) + if (null != applicationID) { + String zkNamespace = cmdLine.hasOption(zookeeperNamespace.getOpt()) ? + cmdLine.getOptionValue(zookeeperNamespace.getOpt()) : config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID); config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); @@ -570,39 +570,39 @@ public class FlinkYarnSessionCli implements CustomCommandLine CommandLine cmd; try { cmd = parser.parse(options, args); - } catch(Exception e) { + } catch (Exception e) { System.out.println(e.getMessage()); printUsage(); return 1; } // Query cluster for metrics - if (cmd.hasOption(QUERY.getOpt())) { + if (cmd.hasOption(query.getOpt())) { AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); String description; try { description = yarnDescriptor.getClusterDescription(); } catch (Exception e) { - System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage()); + System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage()); e.printStackTrace(System.err); return 1; } System.out.println(description); return 0; - } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { + } else if (cmd.hasOption(applicationId.getOpt())) { AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); //configure ZK namespace depending on the value passed - String zkNamespace = cmd.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ? - cmd.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt()) - :yarnDescriptor.getFlinkConfiguration() - .getString(HA_ZOOKEEPER_NAMESPACE_KEY, cmd.getOptionValue(APPLICATION_ID.getOpt())); + String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ? + cmd.getOptionValue(zookeeperNamespace.getOpt()) + : yarnDescriptor.getFlinkConfiguration() + .getString(HA_ZOOKEEPER_NAMESPACE_KEY, cmd.getOptionValue(applicationId.getOpt())); LOG.info("Going to use the ZK namespace: {}", zkNamespace); yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace); try { - yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(APPLICATION_ID.getOpt())); + yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt())); } catch (Exception e) { throw new RuntimeException("Could not retrieve existing Yarn application", e); } @@ -610,7 +610,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine if (detachedMode) { LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill " + APPLICATION_ID.getOpt()); + "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { runInteractiveCli(yarnCluster, true); @@ -629,7 +629,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine try { yarnCluster = yarnDescriptor.deploy(); } catch (Exception e) { - System.err.println("Error while deploying YARN cluster: "+e.getMessage()); + System.err.println("Error while deploying YARN cluster: " + e.getMessage()); e.printStackTrace(System.err); return 1; } http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index 8839c1e..28ef2ab 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -24,7 +24,8 @@ import static org.apache.flink.configuration.ConfigOptions.key; /** * This class holds configuration constants used by Flink's YARN runners. - * These options are not expected to be ever configured by users explicitly. + * + *

These options are not expected to be ever configured by users explicitly. */ public class YarnConfigOptions { @@ -50,11 +51,11 @@ public class YarnConfigOptions { public static final ConfigOption CLASSPATH_INCLUDE_USER_JAR = key("yarn.per-job-cluster.include-user-jar") .defaultValue("ORDER"); - + // ------------------------------------------------------------------------ - /** This class is not meant to be instantiated */ + /** This class is not meant to be instantiated. */ private YarnConfigOptions() {} /** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */ http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java index f81d040..d94921e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java @@ -39,20 +39,20 @@ import java.io.IOException; import java.net.URI; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** * The basis of {@link HighAvailabilityServices} for YARN setups. * These high-availability services auto-configure YARN's HDFS and the YARN application's * working directory to be used to store job recovery data. - * + * *

Note for implementers: This class locks access to and creation of services, * to make sure all services are properly shut down when shutting down this class. * To participate in the checks, overriding methods should frame method body with * calls to {@code enter()} and {@code exit()} as shown in the following pattern: - * + * *

{@code
  * public LeaderRetrievalService getResourceManagerLeaderRetriever() {
  *     enter();
@@ -67,21 +67,21 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public abstract class YarnHighAvailabilityServices implements HighAvailabilityServices {
 
-	/** The name of the sub directory in which Flink stores the recovery data */
+	/** The name of the sub directory in which Flink stores the recovery data. */
 	public static final String FLINK_RECOVERY_DATA_DIR = "flink_recovery_data";
 
-	/** Logger for these services, shared with subclasses */
+	/** Logger for these services, shared with subclasses. */
 	protected static final Logger LOG = LoggerFactory.getLogger(YarnHighAvailabilityServices.class);
 
 	// ------------------------------------------------------------------------
 
-	/** The lock that guards all accesses to methods in this class */
+	/** The lock that guards all accesses to methods in this class. */
 	private final ReentrantLock lock;
 
-	/** The Flink FileSystem object that represent the HDFS used by YARN */
+	/** The Flink FileSystem object that represent the HDFS used by YARN. */
 	protected final FileSystem flinkFileSystem;
 
-	/** The Hadoop FileSystem object that represent the HDFS used by YARN */
+	/** The Hadoop FileSystem object that represent the HDFS used by YARN. */
 	protected final org.apache.hadoop.fs.FileSystem hadoopFileSystem;
 
 	/** The working directory of this YARN application.
@@ -89,13 +89,13 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	protected final Path workingDirectory;
 
 	/** The directory for HA persistent data. This should be deleted when the
-	 * HA services clean up */
+	 * HA services clean up. */
 	protected final Path haDataDirectory;
 
-	/** Blob store service to be used for the BlobServer and BlobCache */
+	/** Blob store service to be used for the BlobServer and BlobCache. */
 	protected final BlobStoreService blobStoreService;
 
-	/** Flag marking this instance as shut down */
+	/** Flag marking this instance as shut down. */
 	private volatile boolean closed;
 
 	// ------------------------------------------------------------------------
@@ -103,13 +103,13 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	/**
 	 * Creates new YARN high-availability services, configuring the file system and recovery
 	 * data directory based on the working directory in the given Hadoop configuration.
-	 * 
+	 *
 	 * 

This class requires that the default Hadoop file system configured in the given * Hadoop configuration is an HDFS. - * + * * @param config The Flink configuration of this component / process. * @param hadoopConf The Hadoop configuration for the YARN cluster. - * + * * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails. */ protected YarnHighAvailabilityServices( @@ -280,7 +280,7 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe /** * Acquires the lock and checks whether the services are already closed. If they are * already closed, the method releases the lock and returns {@code false}. - * + * * @return True, if the lock was acquired and the services are not closed, false if the services are closed. */ boolean enterUnlessClosed() { @@ -307,12 +307,12 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe /** * Creates the high-availability services for a single-job Flink YARN application, to be * used in the Application Master that runs both ResourceManager and JobManager. - * + * * @param flinkConfig The Flink configuration. * @param hadoopConfig The Hadoop configuration for the YARN cluster. - * + * * @return The created high-availability services. - * + * * @throws IOException Thrown, if the high-availability services could not be initialized. */ public static YarnHighAvailabilityServices forSingleJobAppMaster( http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java index abfdb5c..accf8d5 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java @@ -34,7 +34,7 @@ import java.util.concurrent.Executors; /** * These YarnHighAvailabilityServices are for the Application Master in setups where there is one * ResourceManager that is statically configured in the Flink configuration. - * + * *

Handled failure types

*
    *
  • User code & operator failures: Failed operators are recovered from checkpoints.
  • @@ -51,11 +51,11 @@ import java.util.concurrent.Executors; *

    Internally, these services put their recovery data into YARN's working directory, * except for checkpoints, which are in the configured checkpoint directory. That way, * checkpoints can be resumed with a new job/application, even if the complete YARN application - * is killed and cleaned up. + * is killed and cleaned up. * *

    Because ResourceManager and JobManager run both in the same process (Application Master), they * use an embedded leader election service to find each other. - * + * *

    A typical YARN setup that uses these HA services first starts the ResourceManager * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures @@ -65,21 +65,21 @@ import java.util.concurrent.Executors; */ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices { - /** The dispatcher thread pool for these services */ + /** The dispatcher thread pool for these services. */ private final ExecutorService dispatcher; - /** The embedded leader election service used by JobManagers to find the resource manager */ + /** The embedded leader election service used by JobManagers to find the resource manager. */ private final SingleLeaderElectionService resourceManagerLeaderElectionService; // ------------------------------------------------------------------------ /** * Creates new YarnIntraNonHaMasterServices for the given Flink and YARN configuration. - * - * This constructor initializes access to the HDFS to store recovery data, and creates the + * + *

    This constructor initializes access to the HDFS to store recovery data, and creates the * embedded leader election services through which ResourceManager and JobManager find and * confirm each other. - * + * * @param config The Flink configuration of this component / process. * @param hadoopConf The Hadoop configuration for the YARN cluster. * http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java index 9d05bbe..ae8f05b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java @@ -35,7 +35,7 @@ import java.io.IOException; /** * These YarnHighAvailabilityServices are for use by the TaskManager in setups, * where there is one ResourceManager that is statically configured in the Flink configuration. - * + * *

    Handled failure types

    *
      *
    • User code & operator failures: Failed operators are recovered from checkpoints.
    • @@ -52,7 +52,7 @@ import java.io.IOException; *

      Internally, these services put their recovery data into YARN's working directory, * except for checkpoints, which are in the configured checkpoint directory. That way, * checkpoints can be resumed with a new job/application, even if the complete YARN application - * is killed and cleaned up. + * is killed and cleaned up. * *

      A typical YARN setup that uses these HA services first starts the ResourceManager * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which @@ -63,7 +63,7 @@ import java.io.IOException; */ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServices { - /** The RPC URL under which the single ResourceManager can be reached while available */ + /** The RPC URL under which the single ResourceManager can be reached while available. */ private final String resourceManagerRpcUrl; // ------------------------------------------------------------------------ @@ -72,7 +72,7 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi * Creates new YarnPreConfiguredMasterHaServices for the given Flink and YARN configuration. * This constructor parses the ResourceManager address from the Flink configuration and sets * up the HDFS access to store recovery data in the YARN application's working directory. - * + * * @param config The Flink configuration of this component / process. * @param hadoopConf The Hadoop configuration for the YARN cluster. * @@ -97,7 +97,7 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi final int rmPort = config.getInteger(YarnConfigOptions.APP_MASTER_RPC_PORT); if (rmHost == null) { - throw new IllegalConfigurationException("Config parameter '" + + throw new IllegalConfigurationException("Config parameter '" + YarnConfigOptions.APP_MASTER_RPC_ADDRESS.key() + "' is missing."); } if (rmPort < 0) { @@ -105,7 +105,7 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' is missing."); } if (rmPort <= 0 || rmPort >= 65536) { - throw new IllegalConfigurationException("Invalid value for '" + + throw new IllegalConfigurationException("Invalid value for '" + YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' - port must be in [1, 65535]"); } http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java index 2648e44..f11063f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersAllocated.java @@ -19,6 +19,7 @@ package org.apache.flink.yarn.messages; import org.apache.flink.yarn.YarnFlinkResourceManager; + import org.apache.hadoop.yarn.api.records.Container; import java.util.List; @@ -26,17 +27,17 @@ import java.util.List; /** * Message sent by the callback handler to the {@link YarnFlinkResourceManager} * to notify it that a set of new containers is available. - * - * NOTE: This message is not serializable, because the Container object is not serializable. + * + *

      NOTE: This message is not serializable, because the Container object is not serializable. */ public class ContainersAllocated { - + private final List containers; - + public ContainersAllocated(List containers) { this.containers = containers; } - + public List containers() { return containers; } http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java index 65bafbc..5b43835 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/messages/ContainersComplete.java @@ -18,8 +18,8 @@ package org.apache.flink.yarn.messages; - import org.apache.flink.yarn.YarnFlinkResourceManager; + import org.apache.hadoop.yarn.api.records.ContainerStatus; import java.util.List; @@ -27,17 +27,17 @@ import java.util.List; /** * Message sent by the callback handler to the {@link YarnFlinkResourceManager} * to notify it that a set of new containers is complete. - * - * NOTE: This message is not serializable, because the ContainerStatus object is not serializable. + * + *

      NOTE: This message is not serializable, because the ContainerStatus object is not serializable. */ public class ContainersComplete { - + private final List containers; - + public ContainersComplete(List containers) { this.containers = containers; } - + public List containers() { return containers; } http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/resources/log4j.properties b/flink-yarn/src/main/resources/log4j.properties index 749796f..b2ad0d3 100644 --- a/flink-yarn/src/main/resources/log4j.properties +++ b/flink-yarn/src/main/resources/log4j.properties @@ -16,7 +16,6 @@ # limitations under the License. ################################################################################ - # Convenience file for local debugging of the JobManager/TaskManager. log4j.rootLogger=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index 35d5f56..9ac96a3 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -25,11 +25,11 @@ import grizzled.slf4j.Logger import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} -import org.apache.flink.runtime.{LeaderSessionMessageFilter, FlinkActor, LogMessages} +import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.yarn.YarnMessages._ + import scala.collection.mutable import scala.concurrent.duration._ - import scala.language.postfixOps /** Actor which is responsible to repeatedly poll the Yarn cluster status from the ResourceManager. @@ -187,7 +187,7 @@ class ApplicationClient( // locally forward messages case LocalGetYarnMessage => - if(messagesQueue.nonEmpty) { + if (messagesQueue.nonEmpty) { sender() ! decorateMessage(Option(messagesQueue.dequeue())) } else { sender() ! decorateMessage(None) http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index e094bb7..d78b390 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -38,7 +38,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry import scala.concurrent.duration._ import scala.language.postfixOps - /** JobManager actor for execution on Yarn. It enriches the [[JobManager]] with additional messages * to start/administer/stop the Yarn session. * http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala index ada2631..1636e09 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala @@ -18,13 +18,12 @@ package org.apache.flink.yarn -import java.util.{Date, UUID, List => JavaList} +import java.util.{UUID, List => JavaList} -import org.apache.flink.api.common.JobID import org.apache.flink.runtime.clusterframework.ApplicationStatus import org.apache.flink.runtime.messages.RequiresLeaderSessionID import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records.{Container, ContainerStatus, FinalApplicationStatus} +import org.apache.hadoop.yarn.api.records.{Container, ContainerStatus} import scala.concurrent.duration.{Deadline, FiniteDuration} http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index b7f4c9a..e37ff6f 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -23,9 +23,9 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} /** An extension of the TaskManager that listens for additional YARN related * messages. http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java index f03c604..d283c3b 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java @@ -18,7 +18,6 @@ package org.apache.flink.yarn; -import akka.actor.ActorRef; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -26,6 +25,8 @@ import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered; import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources; + +import akka.actor.ActorRef; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; @@ -35,6 +36,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import java.util.Comparator; import java.util.PriorityQueue; +/** + * A test extension to the {@link YarnFlinkResourceManager} that can handle additional test messages. + */ public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager { private final PriorityQueue> waitingQueue = new PriorityQueue<>(32, new Comparator>() { http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index a09c5b2..a5ec176 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -18,11 +18,6 @@ package org.apache.flink.yarn; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.testkit.JavaTestKit; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -36,6 +31,12 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered; import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -53,11 +54,6 @@ import org.junit.Test; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import scala.Option; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.Collections; @@ -66,11 +62,20 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Tests for {@link Utils}. + */ public class UtilsTest extends TestLogger { private static ActorSystem system; @@ -99,7 +104,7 @@ public class UtilsTest extends TestLogger { String applicationMasterHostName = "localhost"; String webInterfaceURL = "foobar"; ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters( - 1l, 1l, 1l, 1, new HashMap()); + 1L, 1L, 1L, 1, new HashMap()); ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class); int yarnHeartbeatIntervalMillis = 1000; int maxFailedContainers = 10; @@ -203,7 +208,7 @@ public class UtilsTest extends TestLogger { expectMsgClass(deadline.timeLeft(), Acknowledge.class); } - Future numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft()); + Future numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, deadline.timeLeft()); int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java index 4884dd0..19a0352 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java @@ -18,10 +18,11 @@ package org.apache.flink.yarn; -import com.google.common.collect.ImmutableMap; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.util.OperatingSystem; + +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assume; @@ -38,7 +39,12 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.util.Map; -import static org.apache.flink.yarn.YarnConfigKeys.*; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyInt; @@ -46,6 +52,9 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +/** + * Tests for the {@link YarnApplicationMasterRunner}. + */ public class YarnApplicationMasterRunnerTest { private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class); @@ -81,7 +90,7 @@ public class YarnApplicationMasterRunnerTest { } }).when(yarnConf).getStrings(anyString(), Mockito. anyVararg()); - Map env = ImmutableMap. builder() + Map env = ImmutableMap. builder() .put(ENV_APP_ID, "foo") .put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath()) .put(ENV_CLIENT_SHIP_FILES, "") http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index a7204da..9326723 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.yarn; import org.apache.flink.configuration.ConfigConstants; @@ -22,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.junit.Before; @@ -32,9 +34,12 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; -import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +/** + * Tests for the {@link YarnClusterDescriptor}. + */ public class YarnClusterDescriptorTest { @Rule @@ -140,7 +145,7 @@ public class YarnClusterDescriptorTest { assertEquals( java + " " + jvmmem + - " " + " " + krb5 +// jvmOpts + " " + " " + krb5 + // jvmOpts " " + // logging " " + mainClass + " " + args + " " + redirects, clusterDescriptor @@ -159,7 +164,7 @@ public class YarnClusterDescriptorTest { assertEquals( java + " " + jvmmem + - " " + " " + krb5 +// jvmOpts + " " + " " + krb5 + // jvmOpts " " + logfile + " " + logback + " " + mainClass + " " + args + " " + redirects, clusterDescriptor @@ -178,7 +183,7 @@ public class YarnClusterDescriptorTest { assertEquals( java + " " + jvmmem + - " " + " " + krb5 +// jvmOpts + " " + " " + krb5 + // jvmOpts " " + logfile + " " + log4j + " " + mainClass + " " + args + " " + redirects, clusterDescriptor @@ -197,7 +202,7 @@ public class YarnClusterDescriptorTest { assertEquals( java + " " + jvmmem + - " " + " " + krb5 +// jvmOpts + " " + " " + krb5 + // jvmOpts " " + logfile + " " + logback + " " + log4j + " " + mainClass + " " + args + " " + redirects, clusterDescriptor @@ -210,16 +215,16 @@ public class YarnClusterDescriptorTest { java + " " + jvmmem + " " + jvmOpts + " " + logfile + " " + logback + " " + log4j + - " " + mainClass + " " + args + " "+ redirects, + " " + mainClass + " " + args + " " + redirects, clusterDescriptor .setupApplicationMasterContainer(true, true, false) .getCommands().get(0)); assertEquals( java + " " + jvmmem + - " " + jvmOpts + " " + krb5 +// jvmOpts + " " + jvmOpts + " " + krb5 + // jvmOpts " " + logfile + " " + logback + " " + log4j + - " " + mainClass + " " + args + " "+ redirects, + " " + mainClass + " " + args + " " + redirects, clusterDescriptor .setupApplicationMasterContainer(true, true, true) .getCommands().get(0)); @@ -230,16 +235,16 @@ public class YarnClusterDescriptorTest { java + " " + jvmmem + " " + jvmOpts + " " + jmJvmOpts + " " + logfile + " " + logback + " " + log4j + - " " + mainClass + " " + args + " "+ redirects, + " " + mainClass + " " + args + " " + redirects, clusterDescriptor .setupApplicationMasterContainer(true, true, false) .getCommands().get(0)); assertEquals( java + " " + jvmmem + - " " + jvmOpts + " " + jmJvmOpts + " " + krb5 +// jvmOpts + " " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts " " + logfile + " " + logback + " " + log4j + - " " + mainClass + " " + args + " "+ redirects, + " " + mainClass + " " + args + " " + redirects, clusterDescriptor .setupApplicationMasterContainer(true, true, true) .getCommands().get(0)); http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java index b4d2ba8..bdc7863 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java @@ -26,10 +26,9 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.StringUtils; - import org.apache.flink.util.TestLogger; -import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.AfterClass; import org.junit.Assume; import org.junit.Before; @@ -37,7 +36,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -53,6 +51,9 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +/** + * Tests for YarnIntraNonHaMasterServices. + */ public class YarnIntraNonHaMasterServicesTest extends TestLogger { private static final Random RND = new Random(); @@ -60,9 +61,9 @@ public class YarnIntraNonHaMasterServicesTest extends TestLogger { @ClassRule public static final TemporaryFolder TEMP_DIR = new TemporaryFolder(); - private static MiniDFSCluster HDFS_CLUSTER; + private static MiniDFSCluster hdfsCluster; - private static Path HDFS_ROOT_PATH; + private static Path hdfsRootPath; private org.apache.hadoop.conf.Configuration hadoopConfig; @@ -80,23 +81,23 @@ public class YarnIntraNonHaMasterServicesTest extends TestLogger { hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath()); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); - HDFS_CLUSTER = builder.build(); - HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI()); + hdfsCluster = builder.build(); + hdfsRootPath = new Path(hdfsCluster.getURI()); } @AfterClass public static void destroyHDFS() { - if (HDFS_CLUSTER != null) { - HDFS_CLUSTER.shutdown(); + if (hdfsCluster != null) { + hdfsCluster.shutdown(); } - HDFS_CLUSTER = null; - HDFS_ROOT_PATH = null; + hdfsCluster = null; + hdfsRootPath = null; } @Before public void initConfig() { hadoopConfig = new org.apache.hadoop.conf.Configuration(); - hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString()); + hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString()); } // ------------------------------------------------------------------------