Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AB6F317A00 for ; Fri, 17 Apr 2015 22:35:39 +0000 (UTC) Received: (qmail 44105 invoked by uid 500); 17 Apr 2015 22:35:30 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 43766 invoked by uid 500); 17 Apr 2015 22:35:30 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 42702 invoked by uid 99); 17 Apr 2015 22:35:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Apr 2015 22:35:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AA3E0E0525; Fri, 17 Apr 2015 22:35:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjshen@apache.org To: common-commits@hadoop.apache.org Date: Fri, 17 Apr 2015 22:35:45 -0000 Message-Id: <338a94550de043e5963706812b480e4e@git.apache.org> In-Reply-To: <9b13fe9f9ad64a739fc1f848139fd860@git.apache.org> References: <9b13fe9f9ad64a739fc1f848139fd860@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/50] [abbrv] hadoop git commit: YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of support for new resource types on the NM. Contributed by Sidharta Seethana. YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of support for new resource types on the NM. Contributed by Sidharta Seethana. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae7a5ffd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae7a5ffd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae7a5ffd Branch: refs/heads/YARN-2928 Commit: ae7a5ffdc2a9f1a09675f0d5fff10481bdc59e77 Parents: 0b8b407 Author: Junping Du Authored: Mon Apr 13 18:35:56 2015 -0700 Committer: Zhijie Shen Committed: Fri Apr 17 15:29:42 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../linux/privileged/PrivilegedOperation.java | 119 +++++ .../PrivilegedOperationException.java | 43 ++ .../privileged/PrivilegedOperationExecutor.java | 255 +++++++++++ .../linux/resources/CGroupsHandler.java | 132 ++++++ .../linux/resources/CGroupsHandlerImpl.java | 436 +++++++++++++++++++ .../linux/resources/ResourceHandler.java | 91 ++++ .../linux/resources/ResourceHandlerChain.java | 142 ++++++ .../resources/ResourceHandlerException.java | 47 ++ .../TestPrivilegedOperationExecutor.java | 233 ++++++++++ .../linux/resources/TestCGroupsHandlerImpl.java | 235 ++++++++++ 11 files changed, 1736 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1214dfd..517b55f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -128,6 +128,9 @@ Release 2.8.0 - UNRELEASED YARN-3347. Improve YARN log command to get AMContainer logs as well as running containers logs. (Xuan Gong via junping_du) + YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of support + for new resource types on the NM. (Sidharta Seethana via junping_du) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java new file mode 100644 index 0000000..74556a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java @@ -0,0 +1,119 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Represents operations that require higher system privileges - e.g + * creating cgroups, launching containers as specified users, 'tc' commands etc + * that are completed using the container-executor binary + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PrivilegedOperation { + + public enum OperationType { + CHECK_SETUP("--checksetup"), + MOUNT_CGROUPS("--mount-cgroups"), + INITIALIZE_CONTAINER(""), //no CLI switch supported yet + LAUNCH_CONTAINER(""), //no CLI switch supported yet + SIGNAL_CONTAINER(""), //no CLI switch supported yet + DELETE_AS_USER(""), //no CLI switch supported yet + TC_MODIFY_STATE("--tc-modify-state"), + TC_READ_STATE("--tc-read-state"), + TC_READ_STATS("--tc-read-stats"), + ADD_PID_TO_CGROUP(""); //no CLI switch supported yet. + + private final String option; + + OperationType(String option) { + this.option = option; + } + + public String getOption() { + return option; + } + } + + public static final String CGROUP_ARG_PREFIX = "cgroups="; + + private final OperationType opType; + private final List args; + + public PrivilegedOperation(OperationType opType, String arg) { + this.opType = opType; + this.args = new ArrayList(); + + if (arg != null) { + this.args.add(arg); + } + } + + public PrivilegedOperation(OperationType opType, List args) { + this.opType = opType; + this.args = new ArrayList(); + + if (args != null) { + this.args.addAll(args); + } + } + + public void appendArgs(String... args) { + for (String arg : args) { + this.args.add(arg); + } + } + + public void appendArgs(List args) { + this.args.addAll(args); + } + + public OperationType getOperationType() { + return opType; + } + + public List getArguments() { + return Collections.unmodifiableList(this.args); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof PrivilegedOperation)) { + return false; + } + + PrivilegedOperation otherOp = (PrivilegedOperation) other; + + return otherOp.opType.equals(opType) && otherOp.args.equals(args); + } + + @Override + public int hashCode() { + return opType.hashCode() + 97 * args.hashCode(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationException.java new file mode 100644 index 0000000..20c234d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationException.java @@ -0,0 +1,43 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class PrivilegedOperationException extends YarnException { + private static final long serialVersionUID = 1L; + + public PrivilegedOperationException() { + super(); + } + + public PrivilegedOperationException(String message) { + super(message); + } + + public PrivilegedOperationException(Throwable cause) { + super(cause); + } + + public PrivilegedOperationException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java new file mode 100644 index 0000000..1c4a51c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java @@ -0,0 +1,255 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.util.Shell.ExitCodeException; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * provides mechanisms to execute PrivilegedContainerOperations * + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PrivilegedOperationExecutor { + private static final Log LOG = LogFactory.getLog(PrivilegedOperationExecutor + .class); + private volatile static PrivilegedOperationExecutor instance; + + private String containerExecutorExe; + + public static String getContainerExecutorExecutablePath(Configuration conf) { + String yarnHomeEnvVar = + System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key()); + File hadoopBin = new File(yarnHomeEnvVar, "bin"); + String defaultPath = + new File(hadoopBin, "container-executor").getAbsolutePath(); + return null == conf + ? defaultPath + : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, + defaultPath); + } + + private void init(Configuration conf) { + containerExecutorExe = getContainerExecutorExecutablePath(conf); + } + + private PrivilegedOperationExecutor(Configuration conf) { + init(conf); + } + + public static PrivilegedOperationExecutor getInstance(Configuration conf) { + if (instance == null) { + synchronized (PrivilegedOperationExecutor.class) { + if (instance == null) { + instance = new PrivilegedOperationExecutor(conf); + } + } + } + + return instance; + } + + /** + * @param prefixCommands in some cases ( e.g priorities using nice ), + * prefix commands are necessary + * @param operation the type and arguments for the operation to be + * executed + * @return execution string array for priviledged operation + */ + + public String[] getPrivilegedOperationExecutionCommand(List + prefixCommands, + PrivilegedOperation operation) { + List fullCommand = new ArrayList(); + + if (prefixCommands != null && !prefixCommands.isEmpty()) { + fullCommand.addAll(prefixCommands); + } + + fullCommand.add(containerExecutorExe); + fullCommand.add(operation.getOperationType().getOption()); + fullCommand.addAll(operation.getArguments()); + + String[] fullCommandArray = + fullCommand.toArray(new String[fullCommand.size()]); + + if (LOG.isDebugEnabled()) { + LOG.debug("Privileged Execution Command Array: " + + Arrays.toString(fullCommandArray)); + } + + return fullCommandArray; + } + + /** + * Executes a privileged operation. It is up to the callers to ensure that + * each privileged operation's parameters are constructed correctly. The + * parameters are passed verbatim to the container-executor binary. + * + * @param prefixCommands in some cases ( e.g priorities using nice ), + * prefix commands are necessary + * @param operation the type and arguments for the operation to be executed + * @param workingDir (optional) working directory for execution + * @param env (optional) env of the command will include specified vars + * @param grabOutput return (possibly large) shell command output + * @return stdout contents from shell executor - useful for some privileged + * operations - e.g --tc_read + * @throws org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException + */ + public String executePrivilegedOperation(List prefixCommands, + PrivilegedOperation operation, File workingDir, + Map env, boolean grabOutput) + throws PrivilegedOperationException { + String[] fullCommandArray = getPrivilegedOperationExecutionCommand + (prefixCommands, operation); + ShellCommandExecutor exec = new ShellCommandExecutor(fullCommandArray, + workingDir, env); + + try { + exec.execute(); + if (LOG.isDebugEnabled()) { + LOG.debug("Privileged Execution Operation Output:"); + LOG.debug(exec.getOutput()); + } + } catch (ExitCodeException e) { + String logLine = new StringBuffer("Shell execution returned exit code: ") + .append(exec.getExitCode()) + .append(". Privileged Execution Operation Output: ") + .append(System.lineSeparator()).append(exec.getOutput()).toString(); + + LOG.warn(logLine); + throw new PrivilegedOperationException(e); + } catch (IOException e) { + LOG.warn("IOException executing command: ", e); + throw new PrivilegedOperationException(e); + } + + if (grabOutput) { + return exec.getOutput(); + } + + return null; + } + + /** + * Executes a privileged operation. It is up to the callers to ensure that + * each privileged operation's parameters are constructed correctly. The + * parameters are passed verbatim to the container-executor binary. + * + * @param operation the type and arguments for the operation to be executed + * @param grabOutput return (possibly large) shell command output + * @return stdout contents from shell executor - useful for some privileged + * operations - e.g --tc_read + * @throws org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException + */ + public String executePrivilegedOperation(PrivilegedOperation operation, + boolean grabOutput) throws PrivilegedOperationException { + return executePrivilegedOperation(null, operation, null, null, grabOutput); + } + + //Utility functions for squashing together operations in supported ways + //At some point, we need to create a generalized mechanism that uses a set + //of squashing 'rules' to squash an set of PrivilegedOperations of varying + //types - e.g Launch Container + Add Pid to CGroup(s) + TC rules + + /** + * Squash operations for cgroups - e.g mount, add pid to cgroup etc ., + * For now, we only implement squashing for 'add pid to cgroup' since this + * is the only optimization relevant to launching containers + * + * @return single squashed cgroup operation. Null on failure. + */ + + public static PrivilegedOperation squashCGroupOperations + (List ops) throws PrivilegedOperationException { + if (ops.size() == 0) { + return null; + } + + StringBuffer finalOpArg = new StringBuffer(PrivilegedOperation + .CGROUP_ARG_PREFIX); + boolean noneArgsOnly = true; + + for (PrivilegedOperation op : ops) { + if (!op.getOperationType() + .equals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP)) { + LOG.warn("Unsupported operation type: " + op.getOperationType()); + throw new PrivilegedOperationException("Unsupported operation type:" + + op.getOperationType()); + } + + List args = op.getArguments(); + if (args.size() != 1) { + LOG.warn("Invalid number of args: " + args.size()); + throw new PrivilegedOperationException("Invalid number of args: " + + args.size()); + } + + String arg = args.get(0); + String tasksFile = StringUtils.substringAfter(arg, + PrivilegedOperation.CGROUP_ARG_PREFIX); + if (tasksFile == null || tasksFile.isEmpty()) { + LOG.warn("Invalid argument: " + arg); + throw new PrivilegedOperationException("Invalid argument: " + arg); + } + + if (tasksFile.equals("none")) { + //Don't append to finalOpArg + continue; + } + + if (noneArgsOnly == false) { + //We have already appended at least one tasks file. + finalOpArg.append(","); + finalOpArg.append(tasksFile); + } else { + finalOpArg.append(tasksFile); + noneArgsOnly = false; + } + } + + if (noneArgsOnly) { + finalOpArg.append("none"); //there were no tasks file to append + } + + PrivilegedOperation finalOp = new PrivilegedOperation( + PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, finalOpArg + .toString()); + + return finalOp; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java new file mode 100644 index 0000000..34429d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java @@ -0,0 +1,132 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Provides CGroups functionality. Implementations are expected to be + * thread-safe + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface CGroupsHandler { + public enum CGroupController { + CPU("cpu"), + NET_CLS("net_cls"); + + private final String name; + + CGroupController(String name) { + this.name = name; + } + + String getName() { + return name; + } + } + + public static final String CGROUP_FILE_TASKS = "tasks"; + public static final String CGROUP_PARAM_CLASSID = "classid"; + + /** + * Mounts a cgroup controller + * @param controller - the controller being mounted + * @throws ResourceHandlerException + */ + public void mountCGroupController(CGroupController controller) + throws ResourceHandlerException; + + /** + * Creates a cgroup for a given controller + * @param controller - controller type for which the cgroup is being created + * @param cGroupId - id of the cgroup being created + * @return full path to created cgroup + * @throws ResourceHandlerException + */ + public String createCGroup(CGroupController controller, String cGroupId) + throws ResourceHandlerException; + + /** + * Deletes the specified cgroup + * @param controller - controller type for the cgroup + * @param cGroupId - id of the cgroup being deleted + * @throws ResourceHandlerException + */ + public void deleteCGroup(CGroupController controller, String cGroupId) throws + ResourceHandlerException; + + /** + * Gets the full path for the cgroup, given a controller and a cgroup id + * @param controller - controller type for the cgroup + * @param cGroupId - id of the cgroup + * @return full path for the cgroup + */ + public String getPathForCGroup(CGroupController controller, String + cGroupId); + + /** + * Gets the full path for the cgroup's tasks file, given a controller and a + * cgroup id + * @param controller - controller type for the cgroup + * @param cGroupId - id of the cgroup + * @return full path for the cgroup's tasks file + */ + public String getPathForCGroupTasks(CGroupController controller, String + cGroupId); + + /** + * Gets the full path for a cgroup parameter, given a controller, + * cgroup id and parameter name + * @param controller - controller type for the cgroup + * @param cGroupId - id of the cgroup + * @param param - cgroup parameter ( e.g classid ) + * @return full path for the cgroup parameter + */ + public String getPathForCGroupParam(CGroupController controller, String + cGroupId, String param); + + /** + * updates a cgroup parameter, given a controller, cgroup id, parameter name + * and a parameter value + * @param controller - controller type for the cgroup + * @param cGroupId - id of the cgroup + * @param param - cgroup parameter ( e.g classid ) + * @param value - value to be written to the parameter file + * @throws ResourceHandlerException + */ + public void updateCGroupParam(CGroupController controller, String cGroupId, + String param, String value) throws ResourceHandlerException; + + /** + * reads a cgroup parameter value, given a controller, cgroup id, parameter + * name + * @param controller - controller type for the cgroup + * @param cGroupId - id of the cgroup + * @param param - cgroup parameter ( e.g classid ) + * @return parameter value as read from the parameter file + * @throws ResourceHandlerException + */ + public String getCGroupParam(CGroupController controller, String cGroupId, + String param) throws ResourceHandlerException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java new file mode 100644 index 0000000..9a4230f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java @@ -0,0 +1,436 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Support for interacting with various CGroup subsystems. Thread-safe. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +class CGroupsHandlerImpl implements CGroupsHandler { + + private static final Log LOG = LogFactory.getLog(CGroupsHandlerImpl.class); + private static final String MTAB_FILE = "/proc/mounts"; + private static final String CGROUPS_FSTYPE = "cgroup"; + + private final String cGroupPrefix; + private final boolean enableCGroupMount; + private final String cGroupMountPath; + private final long deleteCGroupTimeout; + private final long deleteCGroupDelay; + private final Map controllerPaths; + private final ReadWriteLock rwLock; + private final PrivilegedOperationExecutor privilegedOperationExecutor; + private final Clock clock; + + public CGroupsHandlerImpl(Configuration conf, PrivilegedOperationExecutor + privilegedOperationExecutor) throws ResourceHandlerException { + this.cGroupPrefix = conf.get(YarnConfiguration. + NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn") + .replaceAll("^/", "").replaceAll("$/", ""); + this.enableCGroupMount = conf.getBoolean(YarnConfiguration. + NM_LINUX_CONTAINER_CGROUPS_MOUNT, false); + this.cGroupMountPath = conf.get(YarnConfiguration. + NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, null); + this.deleteCGroupTimeout = conf.getLong( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT, + YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT); + this.deleteCGroupDelay = + conf.getLong(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY, + YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY); + this.controllerPaths = new HashMap<>(); + this.rwLock = new ReentrantReadWriteLock(); + this.privilegedOperationExecutor = privilegedOperationExecutor; + this.clock = new SystemClock(); + + init(); + } + + private void init() throws ResourceHandlerException { + initializeControllerPaths(); + } + + private String getControllerPath(CGroupController controller) { + try { + rwLock.readLock().lock(); + return controllerPaths.get(controller); + } finally { + rwLock.readLock().unlock(); + } + } + + private void initializeControllerPaths() throws ResourceHandlerException { + if (enableCGroupMount) { + //nothing to do here - we support 'deferred' mounting of specific + //controllers - we'll populate the path for a given controller when an + //explicit mountCGroupController request is issued. + LOG.info("CGroup controller mounting enabled."); + } else { + //cluster admins are expected to have mounted controllers in specific + //locations - we'll attempt to figure out mount points + initializeControllerPathsFromMtab(); + } + } + + private void initializeControllerPathsFromMtab() + throws ResourceHandlerException { + try { + Map> parsedMtab = parseMtab(); + + //we want to do a bulk update without the paths changing concurrently + rwLock.writeLock().lock(); + + for (CGroupController controller : CGroupController.values()) { + String name = controller.getName(); + String controllerPath = findControllerInMtab(name, parsedMtab); + + if (controllerPath != null) { + File f = new File(controllerPath + "/" + this.cGroupPrefix); + + if (FileUtil.canWrite(f)) { + controllerPaths.put(controller, controllerPath); + } else { + String error = + new StringBuffer("Mount point Based on mtab file: ") + .append(MTAB_FILE).append( + ". Controller mount point not writable for: ") + .append(name).toString(); + + LOG.error(error); + throw new ResourceHandlerException(error); + } + } else { + + LOG.warn("Controller not mounted but automount disabled: " + name); + } + } + } catch (IOException e) { + LOG.warn("Failed to initialize controller paths! Exception: " + e); + throw new ResourceHandlerException( + "Failed to initialize controller paths!"); + } finally { + rwLock.writeLock().unlock(); + } + } + + /* We are looking for entries of the form: + * none /cgroup/path/mem cgroup rw,memory 0 0 + * + * Use a simple pattern that splits on the five spaces, and + * grabs the 2, 3, and 4th fields. + */ + + private static final Pattern MTAB_FILE_FORMAT = Pattern.compile( + "^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$"); + + /* + * Returns a map: path -> mount options + * for mounts with type "cgroup". Cgroup controllers will + * appear in the list of options for a path. + */ + private Map> parseMtab() throws IOException { + Map> ret = new HashMap>(); + BufferedReader in = null; + + try { + FileInputStream fis = new FileInputStream(new File(getMtabFileName())); + in = new BufferedReader(new InputStreamReader(fis, "UTF-8")); + + for (String str = in.readLine(); str != null; + str = in.readLine()) { + Matcher m = MTAB_FILE_FORMAT.matcher(str); + boolean mat = m.find(); + if (mat) { + String path = m.group(1); + String type = m.group(2); + String options = m.group(3); + + if (type.equals(CGROUPS_FSTYPE)) { + List value = Arrays.asList(options.split(",")); + ret.put(path, value); + } + } + } + } catch (IOException e) { + throw new IOException("Error while reading " + getMtabFileName(), e); + } finally { + IOUtils.cleanup(LOG, in); + } + + return ret; + } + + private String findControllerInMtab(String controller, + Map> entries) { + for (Map.Entry> e : entries.entrySet()) { + if (e.getValue().contains(controller)) + return e.getKey(); + } + + return null; + } + + String getMtabFileName() { + return MTAB_FILE; + } + + @Override + public void mountCGroupController(CGroupController controller) + throws ResourceHandlerException { + if (!enableCGroupMount) { + LOG.warn("CGroup mounting is disabled - ignoring mount request for: " + + controller.getName()); + return; + } + + String path = getControllerPath(controller); + + if (path == null) { + try { + //lock out other readers/writers till we are done + rwLock.writeLock().lock(); + + String hierarchy = cGroupPrefix; + StringBuffer controllerPath = new StringBuffer() + .append(cGroupMountPath).append('/').append(controller.getName()); + StringBuffer cGroupKV = new StringBuffer() + .append(controller.getName()).append('=').append(controllerPath); + PrivilegedOperation.OperationType opType = PrivilegedOperation + .OperationType.MOUNT_CGROUPS; + PrivilegedOperation op = new PrivilegedOperation(opType, (String) null); + + op.appendArgs(hierarchy, cGroupKV.toString()); + LOG.info("Mounting controller " + controller.getName() + " at " + + controllerPath); + privilegedOperationExecutor.executePrivilegedOperation(op, false); + + //if privileged operation succeeds, update controller paths + controllerPaths.put(controller, controllerPath.toString()); + + return; + } catch (PrivilegedOperationException e) { + LOG.error("Failed to mount controller: " + controller.getName()); + throw new ResourceHandlerException("Failed to mount controller: " + + controller.getName()); + } finally { + rwLock.writeLock().unlock(); + } + } else { + LOG.info("CGroup controller already mounted at: " + path); + return; + } + } + + @Override + public String getPathForCGroup(CGroupController controller, String cGroupId) { + return new StringBuffer(getControllerPath(controller)) + .append('/').append(cGroupPrefix).append("/") + .append(cGroupId).toString(); + } + + @Override + public String getPathForCGroupTasks(CGroupController controller, + String cGroupId) { + return new StringBuffer(getPathForCGroup(controller, cGroupId)) + .append('/').append(CGROUP_FILE_TASKS).toString(); + } + + @Override + public String getPathForCGroupParam(CGroupController controller, + String cGroupId, String param) { + return new StringBuffer(getPathForCGroup(controller, cGroupId)) + .append('/').append(controller.getName()).append('.') + .append(param).toString(); + } + + @Override + public String createCGroup(CGroupController controller, String cGroupId) + throws ResourceHandlerException { + String path = getPathForCGroup(controller, cGroupId); + + if (LOG.isDebugEnabled()) { + LOG.debug("createCgroup: " + path); + } + + if (!new File(path).mkdir()) { + throw new ResourceHandlerException("Failed to create cgroup at " + path); + } + + return path; + } + + /* + * Utility routine to print first line from cgroup tasks file + */ + private void logLineFromTasksFile(File cgf) { + String str; + if (LOG.isDebugEnabled()) { + try (BufferedReader inl = + new BufferedReader(new InputStreamReader(new FileInputStream(cgf + + "/tasks"), "UTF-8"))) { + if ((str = inl.readLine()) != null) { + LOG.debug("First line in cgroup tasks file: " + cgf + " " + str); + } + } catch (IOException e) { + LOG.warn("Failed to read cgroup tasks file. ", e); + } + } + } + + /** + * If tasks file is empty, delete the cgroup. + * + * @param cgf object referring to the cgroup to be deleted + * @return Boolean indicating whether cgroup was deleted + */ + boolean checkAndDeleteCgroup(File cgf) throws InterruptedException { + boolean deleted = false; + // FileInputStream in = null; + try (FileInputStream in = new FileInputStream(cgf + "/tasks")) { + if (in.read() == -1) { + /* + * "tasks" file is empty, sleep a bit more and then try to delete the + * cgroup. Some versions of linux will occasionally panic due to a race + * condition in this area, hence the paranoia. + */ + Thread.sleep(deleteCGroupDelay); + deleted = cgf.delete(); + if (!deleted) { + LOG.warn("Failed attempt to delete cgroup: " + cgf); + } + } else { + logLineFromTasksFile(cgf); + } + } catch (IOException e) { + LOG.warn("Failed to read cgroup tasks file. ", e); + } + return deleted; + } + + @Override + public void deleteCGroup(CGroupController controller, String cGroupId) + throws ResourceHandlerException { + boolean deleted = false; + String cGroupPath = getPathForCGroup(controller, cGroupId); + + if (LOG.isDebugEnabled()) { + LOG.debug("deleteCGroup: " + cGroupPath); + } + + long start = clock.getTime(); + + do { + try { + deleted = checkAndDeleteCgroup(new File(cGroupPath)); + if (!deleted) { + Thread.sleep(deleteCGroupDelay); + } + } catch (InterruptedException ex) { + // NOP + } + } while (!deleted && (clock.getTime() - start) < deleteCGroupTimeout); + + if (!deleted) { + LOG.warn("Unable to delete " + cGroupPath + + ", tried to delete for " + deleteCGroupTimeout + "ms"); + } + } + + @Override + public void updateCGroupParam(CGroupController controller, String cGroupId, + String param, String value) throws ResourceHandlerException { + String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param); + PrintWriter pw = null; + + if (LOG.isDebugEnabled()) { + LOG.debug( + "updateCGroupParam for path: " + cGroupParamPath + " with value " + + value); + } + + try { + File file = new File(cGroupParamPath); + Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8"); + pw = new PrintWriter(w); + pw.write(value); + } catch (IOException e) { + throw new ResourceHandlerException(new StringBuffer("Unable to write to ") + .append(cGroupParamPath).append(" with value: ").append(value) + .toString(), e); + } finally { + if (pw != null) { + boolean hasError = pw.checkError(); + pw.close(); + if (hasError) { + throw new ResourceHandlerException( + new StringBuffer("Unable to write to ") + .append(cGroupParamPath).append(" with value: ").append(value) + .toString()); + } + if (pw.checkError()) { + throw new ResourceHandlerException("Error while closing cgroup file" + + " " + cGroupParamPath); + } + } + } + } + + @Override + public String getCGroupParam(CGroupController controller, String cGroupId, + String param) throws ResourceHandlerException { + String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param); + + try { + byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath)); + return new String(contents, "UTF-8").trim(); + } catch (IOException e) { + throw new ResourceHandlerException( + "Unable to read from " + cGroupParamPath); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java new file mode 100644 index 0000000..3dfc86b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java @@ -0,0 +1,91 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; + +import java.util.List; + +/** + * Handler interface for resource subsystems' isolation and enforcement. e.g cpu, memory, network, disks etc + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface ResourceHandler { + + /** + * Bootstrap resource susbsystem. + * + * @return (possibly empty) list of operations that require elevated + * privileges + */ + List bootstrap(Configuration configuration) + throws ResourceHandlerException; + + /** + * Prepare a resource environment for container launch + * + * @param container Container being launched + * @return (possibly empty) list of operations that require elevated + * privileges e.g a) create a custom cgroup b) add pid for container to tasks + * file for a cgroup. + * @throws ResourceHandlerException + */ + List preStart(Container container) + throws ResourceHandlerException; + + /** + * Require state for container that was already launched + * + * @param containerId if of the container being reacquired. + * @return (possibly empty) list of operations that require elevated + * privileges + * @throws ResourceHandlerException + */ + + List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException; + + /** + * Perform any tasks necessary after container completion + * @param containerId of the container that was completed. + * @return (possibly empty) list of operations that require elevated + * privileges + * @throws ResourceHandlerException + */ + List postComplete(ContainerId containerId) throws + ResourceHandlerException; + + /** + * Teardown environment for resource subsystem if requested. This method + * needs to be used with care since it could impact running containers. + * + * @return (possibly empty) list of operations that require elevated + * privileges + */ + List teardown() throws ResourceHandlerException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java new file mode 100644 index 0000000..955d216 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java @@ -0,0 +1,142 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A helper class to delegate funcationality to a 'chain' of + * ResourceHandler(s) + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ResourceHandlerChain implements ResourceHandler { + private final List resourceHandlers; + + public ResourceHandlerChain(List resourceHandlers) { + this.resourceHandlers = resourceHandlers; + } + + @Override + public List bootstrap(Configuration configuration) + throws ResourceHandlerException { + + List allOperations = new + ArrayList(); + + for (ResourceHandler resourceHandler : resourceHandlers) { + List handlerOperations = + resourceHandler.bootstrap(configuration); + if (handlerOperations != null) { + allOperations.addAll(handlerOperations); + } + + } + return allOperations; + } + + @Override + public List preStart(Container container) + throws ResourceHandlerException { + List allOperations = new + ArrayList(); + + for (ResourceHandler resourceHandler : resourceHandlers) { + List handlerOperations = + resourceHandler.preStart(container); + + if (handlerOperations != null) { + allOperations.addAll(handlerOperations); + } + + } + return allOperations; + } + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + List allOperations = new + ArrayList(); + + for (ResourceHandler resourceHandler : resourceHandlers) { + List handlerOperations = + resourceHandler.reacquireContainer(containerId); + + if (handlerOperations != null) { + allOperations.addAll(handlerOperations); + } + + } + return allOperations; + } + + @Override + public List postComplete(ContainerId containerId) + throws ResourceHandlerException { + List allOperations = new + ArrayList(); + + for (ResourceHandler resourceHandler : resourceHandlers) { + List handlerOperations = + resourceHandler.postComplete(containerId); + + if (handlerOperations != null) { + allOperations.addAll(handlerOperations); + } + + } + return allOperations; + } + + @Override + public List teardown() + throws ResourceHandlerException { + List allOperations = new + ArrayList(); + + for (ResourceHandler resourceHandler : resourceHandlers) { + List handlerOperations = + resourceHandler.teardown(); + + if (handlerOperations != null) { + allOperations.addAll(handlerOperations); + } + + } + return allOperations; + } + + List getResourceHandlerList() { + return Collections.unmodifiableList(resourceHandlers); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerException.java new file mode 100644 index 0000000..3ab7548 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerException.java @@ -0,0 +1,47 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.exceptions.YarnException; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ResourceHandlerException extends YarnException { + private static final long serialVersionUID = 1L; + + public ResourceHandlerException() { + super(); + } + + public ResourceHandlerException(String message) { + super(message); + } + + public ResourceHandlerException(Throwable cause) { + super(cause); + } + + public ResourceHandlerException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/TestPrivilegedOperationExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/TestPrivilegedOperationExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/TestPrivilegedOperationExecutor.java new file mode 100644 index 0000000..7154d03 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/TestPrivilegedOperationExecutor.java @@ -0,0 +1,233 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestPrivilegedOperationExecutor { + private static final Log LOG = LogFactory + .getLog(TestPrivilegedOperationExecutor.class); + private String localDataDir; + private String customExecutorPath; + private Configuration nullConf = null; + private Configuration emptyConf; + private Configuration confWithExecutorPath; + + private String cGroupTasksNone; + private String cGroupTasksInvalid; + private String cGroupTasks1; + private String cGroupTasks2; + private String cGroupTasks3; + private PrivilegedOperation opDisallowed; + private PrivilegedOperation opTasksNone; + private PrivilegedOperation opTasksInvalid; + private PrivilegedOperation opTasks1; + private PrivilegedOperation opTasks2; + private PrivilegedOperation opTasks3; + + @Before + public void setup() { + localDataDir = System.getProperty("test.build.data"); + customExecutorPath = localDataDir + "/bin/container-executor"; + emptyConf = new YarnConfiguration(); + confWithExecutorPath = new YarnConfiguration(); + confWithExecutorPath.set(YarnConfiguration + .NM_LINUX_CONTAINER_EXECUTOR_PATH, customExecutorPath); + + cGroupTasksNone = "none"; + cGroupTasksInvalid = "invalid_string"; + cGroupTasks1 = "cpu/hadoop_yarn/container_01/tasks"; + cGroupTasks2 = "net_cls/hadoop_yarn/container_01/tasks"; + cGroupTasks3 = "blkio/hadoop_yarn/container_01/tasks"; + opDisallowed = new PrivilegedOperation + (PrivilegedOperation.OperationType.DELETE_AS_USER, (String) null); + opTasksNone = new PrivilegedOperation + (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupTasksNone); + opTasksInvalid = new PrivilegedOperation + (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + cGroupTasksInvalid); + opTasks1 = new PrivilegedOperation + (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupTasks1); + opTasks2 = new PrivilegedOperation + (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupTasks2); + opTasks3 = new PrivilegedOperation + (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupTasks3); + } + + @Test + public void testExecutorPath() { + String containerExePath = PrivilegedOperationExecutor + .getContainerExecutorExecutablePath(nullConf); + + //In case HADOOP_YARN_HOME isn't set, CWD is used. If conf is null or + //NM_LINUX_CONTAINER_EXECUTOR_PATH is not set, then a defaultPath is + //constructed. + String yarnHomeEnvVar = System.getenv("HADOOP_YARN_HOME"); + String yarnHome = yarnHomeEnvVar != null ? yarnHomeEnvVar + : new File("").getAbsolutePath(); + String expectedPath = yarnHome + "/bin/container-executor"; + + Assert.assertEquals(expectedPath, containerExePath); + + containerExePath = PrivilegedOperationExecutor + .getContainerExecutorExecutablePath(emptyConf); + Assert.assertEquals(expectedPath, containerExePath); + + //if NM_LINUX_CONTAINER_EXECUTOR_PATH is set, this must be returned + expectedPath = customExecutorPath; + containerExePath = PrivilegedOperationExecutor + .getContainerExecutorExecutablePath(confWithExecutorPath); + Assert.assertEquals(expectedPath, containerExePath); + } + + @Test + public void testExecutionCommand() { + PrivilegedOperationExecutor exec = PrivilegedOperationExecutor + .getInstance(confWithExecutorPath); + PrivilegedOperation op = new PrivilegedOperation(PrivilegedOperation + .OperationType.LAUNCH_CONTAINER, (String) null); + String[] cmdArray = exec.getPrivilegedOperationExecutionCommand(null, op); + + //No arguments added - so the resulting array should consist of + //1)full path to executor 2) cli switch + Assert.assertEquals(2, cmdArray.length); + Assert.assertEquals(customExecutorPath, cmdArray[0]); + Assert.assertEquals(op.getOperationType().getOption(), cmdArray[1]); + + //other (dummy) arguments to launch container + String[] additionalArgs = { "test_user", "yarn", "1", "app_01", + "container_01", "workdir", "launch_script.sh", "tokens", "pidfile", + "nm-local-dirs", "nm-log-dirs", "resource-spec" }; + + op.appendArgs(additionalArgs); + cmdArray = exec.getPrivilegedOperationExecutionCommand(null, op); + + //Resulting array should be of length 2 greater than the number of + //additional arguments added. + + Assert.assertEquals(2 + additionalArgs.length, cmdArray.length); + Assert.assertEquals(customExecutorPath, cmdArray[0]); + Assert.assertEquals(op.getOperationType().getOption(), cmdArray[1]); + + //Rest of args should be same as additional args. + for (int i = 0; i < additionalArgs.length; ++i) { + Assert.assertEquals(additionalArgs[i], cmdArray[2 + i]); + } + + //Now test prefix commands + List prefixCommands = Arrays.asList("nice", "-10"); + cmdArray = exec.getPrivilegedOperationExecutionCommand(prefixCommands, op); + int prefixLength = prefixCommands.size(); + //Resulting array should be of length of prefix command args + 2 (exec + // path + switch) + length of additional args. + Assert.assertEquals(prefixLength + 2 + additionalArgs.length, + cmdArray.length); + + //Prefix command array comes first + for (int i = 0; i < prefixLength; ++i) { + Assert.assertEquals(prefixCommands.get(i), cmdArray[i]); + } + + //Followed by the container executor path and the cli switch + Assert.assertEquals(customExecutorPath, cmdArray[prefixLength]); + Assert.assertEquals(op.getOperationType().getOption(), + cmdArray[prefixLength + 1]); + + //Followed by the rest of the args + //Rest of args should be same as additional args. + for (int i = 0; i < additionalArgs.length; ++i) { + Assert.assertEquals(additionalArgs[i], cmdArray[prefixLength + 2 + i]); + } + } + + @Test + public void testSquashCGroupOperationsWithInvalidOperations() { + List ops = new ArrayList<>(); + + //Ensure that disallowed ops are rejected + ops.add(opTasksNone); + ops.add(opDisallowed); + + try { + PrivilegedOperationExecutor.squashCGroupOperations(ops); + Assert.fail("Expected squash operation to fail with an exception!"); + } catch (PrivilegedOperationException e) { + LOG.info("Caught expected exception : " + e); + } + + //Ensure that invalid strings are rejected + ops.clear(); + ops.add(opTasksNone); + ops.add(opTasksInvalid); + + try { + PrivilegedOperationExecutor.squashCGroupOperations(ops); + Assert.fail("Expected squash operation to fail with an exception!"); + } catch (PrivilegedOperationException e) { + LOG.info("Caught expected exception : " + e); + } + } + + @Test + public void testSquashCGroupOperationsWithValidOperations() { + List ops = new ArrayList<>(); + //Test squashing, including 'none' + ops.clear(); + ops.add(opTasks1); + //this is expected to be ignored + ops.add(opTasksNone); + ops.add(opTasks2); + ops.add(opTasks3); + + try { + PrivilegedOperation op = PrivilegedOperationExecutor + .squashCGroupOperations(ops); + String expected = new StringBuffer + (PrivilegedOperation.CGROUP_ARG_PREFIX) + .append(cGroupTasks1).append(',') + .append(cGroupTasks2).append(',') + .append(cGroupTasks3).toString(); + + //We expect axactly one argument + Assert.assertEquals(1, op.getArguments().size()); + //Squashed list of tasks files + Assert.assertEquals(expected, op.getArguments().get(0)); + } catch (PrivilegedOperationException e) { + LOG.info("Caught unexpected exception : " + e); + Assert.fail("Caught unexpected exception: " + e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae7a5ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java new file mode 100644 index 0000000..0717447 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java @@ -0,0 +1,235 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class TestCGroupsHandlerImpl { + private static final Log LOG = + LogFactory.getLog(TestCGroupsHandlerImpl.class); + + private PrivilegedOperationExecutor privilegedOperationExecutorMock; + private Configuration conf; + private String tmpPath; + private String hierarchy; + private CGroupsHandler.CGroupController controller; + private String controllerPath; + + @Before + public void setup() { + privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class); + conf = new YarnConfiguration(); + tmpPath = System.getProperty("test.build.data") + "/cgroups"; + //no leading or trailing slashes here + hierarchy = "test-hadoop-yarn"; + + conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, hierarchy); + conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT, true); + conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, tmpPath); + controller = CGroupsHandler.CGroupController.NET_CLS; + controllerPath = new StringBuffer(tmpPath).append('/') + .append(controller.getName()).append('/').append(hierarchy).toString(); + } + + @Test + public void testMountController() { + CGroupsHandler cGroupsHandler = null; + //Since we enabled (deferred) cgroup controller mounting, no interactions + //should have occurred, with this mock + verifyZeroInteractions(privilegedOperationExecutorMock); + + try { + cGroupsHandler = new CGroupsHandlerImpl(conf, + privilegedOperationExecutorMock); + PrivilegedOperation expectedOp = new PrivilegedOperation + (PrivilegedOperation.OperationType.MOUNT_CGROUPS, (String) null); + //This is expected to be of the form : + //net_cls=/net_cls + StringBuffer controllerKV = new StringBuffer(controller.getName()) + .append('=').append(tmpPath).append('/').append(controller.getName()); + expectedOp.appendArgs(hierarchy, controllerKV.toString()); + + cGroupsHandler.mountCGroupController(controller); + try { + ArgumentCaptor opCaptor = ArgumentCaptor.forClass + (PrivilegedOperation.class); + verify(privilegedOperationExecutorMock) + .executePrivilegedOperation(opCaptor.capture(), eq(false)); + + //we'll explicitly capture and assert that the + //captured op and the expected op are identical. + Assert.assertEquals(expectedOp, opCaptor.getValue()); + verifyNoMoreInteractions(privilegedOperationExecutorMock); + + //Try mounting the same controller again - this should be a no-op + cGroupsHandler.mountCGroupController(controller); + verifyNoMoreInteractions(privilegedOperationExecutorMock); + } catch (PrivilegedOperationException e) { + LOG.error("Caught exception: " + e); + Assert.assertTrue("Unexpected PrivilegedOperationException from mock!", + false); + } + } catch (ResourceHandlerException e) { + LOG.error("Caught exception: " + e); + Assert.assertTrue("Unexpected ResourceHandler Exception!", false); + } + } + + @Test + public void testCGroupPaths() { + //As per junit behavior, we expect a new mock object to be available + //in this test. + verifyZeroInteractions(privilegedOperationExecutorMock); + CGroupsHandler cGroupsHandler = null; + try { + cGroupsHandler = new CGroupsHandlerImpl(conf, + privilegedOperationExecutorMock); + cGroupsHandler.mountCGroupController(controller); + } catch (ResourceHandlerException e) { + LOG.error("Caught exception: " + e); + Assert.assertTrue( + "Unexpected ResourceHandlerException when mounting controller!", + false); + } + + String testCGroup = "container_01"; + String expectedPath = new StringBuffer(controllerPath).append('/') + .append(testCGroup).toString(); + String path = cGroupsHandler.getPathForCGroup(controller, testCGroup); + Assert.assertEquals(expectedPath, path); + + String expectedPathTasks = new StringBuffer(expectedPath).append('/') + .append(CGroupsHandler.CGROUP_FILE_TASKS).toString(); + path = cGroupsHandler.getPathForCGroupTasks(controller, testCGroup); + Assert.assertEquals(expectedPathTasks, path); + + String param = CGroupsHandler.CGROUP_PARAM_CLASSID; + String expectedPathParam = new StringBuffer(expectedPath).append('/') + .append(controller.getName()).append('.').append(param).toString(); + path = cGroupsHandler.getPathForCGroupParam(controller, testCGroup, param); + Assert.assertEquals(expectedPathParam, path); + } + + @Test + public void testCGroupOperations() { + //As per junit behavior, we expect a new mock object to be available + //in this test. + verifyZeroInteractions(privilegedOperationExecutorMock); + CGroupsHandler cGroupsHandler = null; + + try { + cGroupsHandler = new CGroupsHandlerImpl(conf, + privilegedOperationExecutorMock); + cGroupsHandler.mountCGroupController(controller); + } catch (ResourceHandlerException e) { + LOG.error("Caught exception: " + e); + Assert.assertTrue( + "Unexpected ResourceHandlerException when mounting controller!", + false); + } + //Lets manually create a path to (partially) simulate a mounted controller + //this is required because the handler uses a mocked privileged operation + //executor + new File(controllerPath).mkdirs(); + + String testCGroup = "container_01"; + String expectedPath = new StringBuffer(controllerPath).append('/') + .append(testCGroup).toString(); + try { + String path = cGroupsHandler.createCGroup(controller, testCGroup); + + Assert.assertTrue(new File(expectedPath).exists()); + Assert.assertEquals(expectedPath, path); + + //update param and read param tests. + //We don't use net_cls.classid because as a test param here because + //cgroups provides very specific read/write semantics for classid (only + //numbers can be written - potentially as hex but can be read out only + //as decimal) + String param = "test_param"; + String paramValue = "test_param_value"; + + cGroupsHandler + .updateCGroupParam(controller, testCGroup, param, paramValue); + String paramPath = new StringBuffer(expectedPath).append('/') + .append(controller.getName()).append('.').append(param).toString(); + File paramFile = new File(paramPath); + + Assert.assertTrue(paramFile.exists()); + try { + Assert.assertEquals(paramValue, new String(Files.readAllBytes + (paramFile + .toPath()))); + } catch (IOException e) { + LOG.error("Caught exception: " + e); + Assert.assertTrue("Unexpected IOException trying to read cgroup param!", + false); + } + + Assert.assertEquals(paramValue, cGroupsHandler.getCGroupParam + (controller, testCGroup, param)); + + //We can't really do a delete test here. Linux cgroups + //implementation provides additional semantics - the cgroup cannot be + //deleted if there are any tasks still running in the cgroup even if + //the user attempting the delete has the file permissions to do so - we + //cannot simulate that here. Even if we create a dummy 'tasks' file, we + //wouldn't be able to simulate the delete behavior we need, since a cgroup + //can be deleted using using 'rmdir' if the tasks file is empty. Such a + //delete is not possible with a regular non-empty directory. + } catch (ResourceHandlerException e) { + LOG.error("Caught exception: " + e); + Assert.assertTrue( + "Unexpected ResourceHandlerException during cgroup operations!", + false); + } + } + + @After + public void teardown() { + FileUtil.fullyDelete(new File(tmpPath)); + } +} \ No newline at end of file