hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1076939 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/security/ core/org/apache/hadoop/util/ hdfs/org/apache/hadoop/hdfs/server/namenode/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/map...
Date Fri, 04 Mar 2011 03:24:13 GMT
Author: omalley
Date: Fri Mar  4 03:24:12 2011
New Revision: 1076939

URL: http://svn.apache.org/viewvc?rev=1076939&view=rev
Log:
commit 5d6652cfd28fa9b0bcc3af4c54e2d5c484ab4d7b
Author: Lee Tucker <ltucker@yahoo-inc.com>
Date:   Thu Jul 30 17:40:23 2009 -0700

    Applying patch 2703395.5643.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/PermissionChecker.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/HostsFileReader.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/PermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/PermissionChecker.java?rev=1076939&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/PermissionChecker.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/PermissionChecker.java Fri Mar  4 03:24:12 2011
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/** Perform permission checking. */
+public class PermissionChecker {
+  static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
+  public final String user;
+  protected final Set<String> groups = new HashSet<String>();
+  public final boolean isSuper;
+
+  /**
+   * Checks if the caller has the required permission.
+   * @param owner username of the owner
+   * @param supergroup supergroup that the owner belongs to
+   */
+  public PermissionChecker(String owner, String supergroup
+      ) throws AccessControlException{
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ugi=" + ugi);
+    }
+
+    if (ugi != null) {
+      user = ugi.getUserName();
+      groups.addAll(Arrays.asList(ugi.getGroupNames()));
+      isSuper = user.equals(owner) || groups.contains(supergroup);
+    }
+    else {
+      throw new AccessControlException("ugi = null");
+    }
+  }
+
+  /**
+   * Check if the callers group contains the required values.
+   * @param group group to check
+   */
+  public boolean containsGroup(String group) {return groups.contains(group);}
+
+  /**
+   * Verify if the caller has the required permission. This will result into 
+   * an exception if the caller is not allowed to access the resource.
+   * @param owner owner of the system
+   * @param supergroup supergroup of the system
+   */
+  public static void checkSuperuserPrivilege(UserGroupInformation owner, 
+                                             String supergroup) 
+  throws AccessControlException {
+    PermissionChecker checker = 
+      new PermissionChecker(owner.getUserName(), supergroup);
+    if (!checker.isSuper) {
+      throw new AccessControlException("Access denied for user " 
+          + checker.user + ". Superuser privilege is required");
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/HostsFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/HostsFileReader.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/HostsFileReader.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/HostsFileReader.java Fri Mar  4 03:24:12 2011
@@ -22,13 +22,18 @@ import java.io.*;
 import java.util.Set;
 import java.util.HashSet;
 
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 
-// Keeps track of which datanodes are allowed to connect to the namenode.
+// Keeps track of which datanodes/tasktrackers are allowed to connect to the 
+// namenode/jobtracker.
 public class HostsFileReader {
   private Set<String> includes;
   private Set<String> excludes;
   private String includesFile;
   private String excludesFile;
+  
+  private static final Log LOG = LogFactory.getLog(HostsFileReader.class);
 
   public HostsFileReader(String inFile, 
                          String exFile) throws IOException {
@@ -40,7 +45,11 @@ public class HostsFileReader {
   }
 
   private void readFileToSet(String filename, Set<String> set) throws IOException {
-    FileInputStream fis = new FileInputStream(new File(filename));
+    File file = new File(filename);
+    if (!file.exists()) {
+      return;
+    }
+    FileInputStream fis = new FileInputStream(file);
     BufferedReader reader = null;
     try {
       reader = new BufferedReader(new InputStreamReader(fis));
@@ -64,30 +73,36 @@ public class HostsFileReader {
   }
 
   public synchronized void refresh() throws IOException {
-    includes.clear();
-    excludes.clear();
-    
+    LOG.info("Refreshing hosts (include/exclude) list");
     if (!includesFile.equals("")) {
-      readFileToSet(includesFile, includes);
+      Set<String> newIncludes = new HashSet<String>();
+      readFileToSet(includesFile, newIncludes);
+      // switch the new hosts that are to be included
+      includes = newIncludes;
     }
     if (!excludesFile.equals("")) {
-      readFileToSet(excludesFile, excludes);
+      Set<String> newExcludes = new HashSet<String>();
+      readFileToSet(excludesFile, newExcludes);
+      // switch the excluded hosts
+      excludes = newExcludes;
     }
   }
 
-  public Set<String> getHosts() {
+  public synchronized Set<String> getHosts() {
     return includes;
   }
 
-  public Set<String> getExcludedHosts() {
+  public synchronized Set<String> getExcludedHosts() {
     return excludes;
   }
 
   public synchronized void setIncludesFile(String includesFile) {
+    LOG.info("Setting the includes file to " + includesFile);
     this.includesFile = includesFile;
   }
   
   public synchronized void setExcludesFile(String excludesFile) {
+    LOG.info("Setting the excludes file to " + excludesFile);
     this.excludesFile = excludesFile;
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar  4 03:24:12 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.PermissionChecker;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.*;
@@ -500,7 +501,7 @@ public class FSNamesystem implements FSC
    * Dump all metadata into specified file
    */
   synchronized void metaSave(String filename) throws IOException {
-    checkSuperuserPrivilege();
+    checkAccess();
     File file = new File(System.getProperty("hadoop.log.dir"), 
                          filename);
     PrintWriter out = new PrintWriter(new BufferedWriter(
@@ -605,7 +606,7 @@ public class FSNamesystem implements FSC
    */
   synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
       throws IOException {
-    checkSuperuserPrivilege();
+    checkAccess();
 
     DatanodeDescriptor node = getDatanode(datanode);
     if (node == null) {
@@ -695,7 +696,7 @@ public class FSNamesystem implements FSC
    */
   public synchronized void setOwner(String src, String username, String group
       ) throws IOException {
-    PermissionChecker pc = checkOwner(src);
+    FSPermissionChecker pc = checkOwner(src);
     if (!pc.isSuper) {
       if (username != null && !pc.user.equals(username)) {
         throw new AccessControlException("Non-super user cannot change owner.");
@@ -1787,7 +1788,7 @@ public class FSNamesystem implements FSC
    */
   void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
     if (isPermissionEnabled) {
-      checkSuperuserPrivilege();
+      checkAccess();
     }
     
     dir.setQuota(path, nsQuota, dsQuota);
@@ -3409,7 +3410,7 @@ public class FSNamesystem implements FSC
   }
   
   long[] getStats() throws IOException {
-    checkSuperuserPrivilege();
+    checkAccess();
     synchronized(heartbeats) {
       return new long[] {this.capacityTotal, this.capacityUsed, 
                          this.capacityRemaining,
@@ -3549,7 +3550,7 @@ public class FSNamesystem implements FSC
 
   public synchronized DatanodeInfo[] datanodeReport( DatanodeReportType type
       ) throws AccessControlException {
-    checkSuperuserPrivilege();
+    checkAccess();
 
     ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
     DatanodeInfo[] arr = new DatanodeInfo[results.size()];
@@ -3568,7 +3569,7 @@ public class FSNamesystem implements FSC
    * @throws IOException if 
    */
   synchronized void saveNamespace() throws AccessControlException, IOException {
-    checkSuperuserPrivilege();
+    checkAccess();
     if(!isInSafeMode()) {
       throw new IOException("Safe mode should be turned ON " +
                             "in order to create namespace image.");
@@ -3824,7 +3825,7 @@ public class FSNamesystem implements FSC
    * 4. Removed from exclude --> stop decommission.
    */
   public void refreshNodes(Configuration conf) throws IOException {
-    checkSuperuserPrivilege();
+    checkAccess();
     // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
     // Update the file names and refresh internal includes and excludes list
     if (conf == null)
@@ -3858,7 +3859,7 @@ public class FSNamesystem implements FSC
   }
     
   void finalizeUpgrade() throws IOException {
-    checkSuperuserPrivilege();
+    checkAccess();
     getFSImage().finalizeUpgrade();
   }
 
@@ -4301,7 +4302,7 @@ public class FSNamesystem implements FSC
     
   boolean setSafeMode(SafeModeAction action) throws IOException {
     if (action != SafeModeAction.SAFEMODE_GET) {
-      checkSuperuserPrivilege();
+      checkAccess();
       switch(action) {
       case SAFEMODE_LEAVE: // leave safe mode
         leaveSafeMode(false);
@@ -4459,49 +4460,45 @@ public class FSNamesystem implements FSC
     return new PermissionStatus(fsOwner.getUserName(), supergroup, permission);
   }
 
-  private PermissionChecker checkOwner(String path) throws AccessControlException {
+  private FSPermissionChecker checkOwner(String path) throws AccessControlException {
     return checkPermission(path, true, null, null, null, null);
   }
 
-  private PermissionChecker checkPathAccess(String path, FsAction access
+  private FSPermissionChecker checkPathAccess(String path, FsAction access
       ) throws AccessControlException {
     return checkPermission(path, false, null, null, access, null);
   }
 
-  private PermissionChecker checkParentAccess(String path, FsAction access
+  private FSPermissionChecker checkParentAccess(String path, FsAction access
       ) throws AccessControlException {
     return checkPermission(path, false, null, access, null, null);
   }
 
-  private PermissionChecker checkAncestorAccess(String path, FsAction access
+  private FSPermissionChecker checkAncestorAccess(String path, FsAction access
       ) throws AccessControlException {
     return checkPermission(path, false, access, null, null, null);
   }
 
-  private PermissionChecker checkTraverse(String path
+  private FSPermissionChecker checkTraverse(String path
       ) throws AccessControlException {
     return checkPermission(path, false, null, null, null, null);
   }
 
-  private void checkSuperuserPrivilege() throws AccessControlException {
+  private void checkAccess() throws AccessControlException {
     if (isPermissionEnabled) {
-      PermissionChecker pc = new PermissionChecker(
-          fsOwner.getUserName(), supergroup);
-      if (!pc.isSuper) {
-        throw new AccessControlException("Superuser privilege is required");
-      }
+      PermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
     }
   }
 
   /**
    * Check whether current user have permissions to access the path.
    * For more details of the parameters, see
-   * {@link PermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
+   * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
    */
-  private PermissionChecker checkPermission(String path, boolean doCheckOwner,
+  private FSPermissionChecker checkPermission(String path, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
       FsAction subAccess) throws AccessControlException {
-    PermissionChecker pc = new PermissionChecker(
+    FSPermissionChecker pc = new FSPermissionChecker(
         fsOwner.getUserName(), supergroup);
     if (!pc.isSuper) {
       dir.waitForReady();

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1076939&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Fri Mar  4 03:24:12 2011
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.PermissionChecker;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/** Perform permission checking in {@link FSNamesystem}. */
+class FSPermissionChecker extends PermissionChecker {
+  static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
+  FSPermissionChecker(String fsOwner, String supergroup
+      ) throws AccessControlException{
+    super(fsOwner, supergroup);
+  }
+
+  /**
+   * Check whether current user have permissions to access the path.
+   * Traverse is always checked.
+   *
+   * Parent path means the parent directory for the path.
+   * Ancestor path means the last (the closest) existing ancestor directory
+   * of the path.
+   * Note that if the parent path exists,
+   * then the parent path and the ancestor path are the same.
+   *
+   * For example, suppose the path is "/foo/bar/baz".
+   * No matter baz is a file or a directory,
+   * the parent path is "/foo/bar".
+   * If bar exists, then the ancestor path is also "/foo/bar".
+   * If bar does not exist and foo exists,
+   * then the ancestor path is "/foo".
+   * Further, if both foo and bar do not exist,
+   * then the ancestor path is "/".
+   *
+   * @param doCheckOwner Require user to be the owner of the path?
+   * @param ancestorAccess The access required by the ancestor of the path.
+   * @param parentAccess The access required by the parent of the path.
+   * @param access The access required by the path.
+   * @param subAccess If path is a directory,
+   * it is the access required of the path and all the sub-directories.
+   * If path is not a directory, there is no effect.
+   * @return a PermissionChecker object which caches data for later use.
+   * @throws AccessControlException
+   */
+  void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
+      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+      FsAction subAccess) throws AccessControlException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ACCESS CHECK: " + this
+          + ", doCheckOwner=" + doCheckOwner
+          + ", ancestorAccess=" + ancestorAccess
+          + ", parentAccess=" + parentAccess
+          + ", access=" + access
+          + ", subAccess=" + subAccess);
+    }
+
+    synchronized(root) {
+      INode[] inodes = root.getExistingPathINodes(path);
+      int ancestorIndex = inodes.length - 2;
+      for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
+          ancestorIndex--);
+      checkTraverse(inodes, ancestorIndex);
+
+      if (ancestorAccess != null && inodes.length > 1) {
+        check(inodes, ancestorIndex, ancestorAccess);
+      }
+      if (parentAccess != null && inodes.length > 1) {
+        check(inodes, inodes.length - 2, parentAccess);
+      }
+      if (access != null) {
+        check(inodes[inodes.length - 1], access);
+      }
+      if (subAccess != null) {
+        checkSubAccess(inodes[inodes.length - 1], subAccess);
+      }
+      if (doCheckOwner) {
+        checkOwner(inodes[inodes.length - 1]);
+      }
+    }
+  }
+
+  private void checkOwner(INode inode) throws AccessControlException {
+    if (inode != null && user.equals(inode.getUserName())) {
+      return;
+    }
+    throw new AccessControlException("Permission denied");
+  }
+
+  private void checkTraverse(INode[] inodes, int last
+      ) throws AccessControlException {
+    for(int j = 0; j <= last; j++) {
+      check(inodes[j], FsAction.EXECUTE);
+    }
+  }
+
+  private void checkSubAccess(INode inode, FsAction access
+      ) throws AccessControlException {
+    if (inode == null || !inode.isDirectory()) {
+      return;
+    }
+
+    Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
+    for(directories.push((INodeDirectory)inode); !directories.isEmpty(); ) {
+      INodeDirectory d = directories.pop();
+      check(d, access);
+
+      for(INode child : d.getChildren()) {
+        if (child.isDirectory()) {
+          directories.push((INodeDirectory)child);
+        }
+      }
+    }
+  }
+
+  private void check(INode[] inodes, int i, FsAction access
+      ) throws AccessControlException {
+    check(i >= 0? inodes[i]: null, access);
+  }
+
+  private void check(INode inode, FsAction access
+      ) throws AccessControlException {
+    if (inode == null) {
+      return;
+    }
+    FsPermission mode = inode.getFsPermission();
+
+    if (user.equals(inode.getUserName())) { //user class
+      if (mode.getUserAction().implies(access)) { return; }
+    }
+    else if (groups.contains(inode.getGroupName())) { //group class
+      if (mode.getGroupAction().implies(access)) { return; }
+    }
+    else { //other class
+      if (mode.getOtherAction().implies(access)) { return; }
+    }
+    throw new AccessControlException("Permission denied: user=" + user
+        + ", access=" + access + ", inode=" + inode);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java Fri Mar  4 03:24:12 2011
@@ -30,11 +30,17 @@ public interface AdminOperationsProtocol
   
   /**
    * Version 1: Initial version. Added refreshQueueAcls.
+   * Version 2: Added node refresh facility
    */
-  public static final long versionID = 1L;
+  public static final long versionID = 2L;
 
   /**
    * Refresh the queue acls in use currently.
    */
   void refreshQueueAcls() throws IOException;
+  
+  /**
+   * Refresh the node list at the {@link JobTracker} 
+   */
+  void refreshNodes() throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Fri Mar  4 03:24:12 2011
@@ -61,6 +61,7 @@ public class ClusterStatus implements Wr
   private Collection<String> activeTrackers = new ArrayList<String>();
   private Collection<String> blacklistedTrackers = new ArrayList<String>();
   private int numBlacklistedTrackers;
+  private int numExcludedNodes;
   private long ttExpiryInterval;
   private int map_tasks;
   private int reduce_tasks;
@@ -105,8 +106,19 @@ public class ClusterStatus implements Wr
   ClusterStatus(int trackers, int blacklists, long ttExpiryInterval, 
                 int maps, int reduces,
                 int maxMaps, int maxReduces, JobTracker.State state) {
+    this(trackers, blacklists, ttExpiryInterval, maps, reduces, maxMaps, 
+         maxReduces, state, 0);
+  }
+
+  /**
+   * @param numDecommissionedNodes number of decommission trackers
+   */
+  ClusterStatus(int trackers, int blacklists, long ttExpiryInterval, 
+                int maps, int reduces, int maxMaps, int maxReduces, 
+                JobTracker.State state, int numDecommissionedNodes) {
     numActiveTrackers = trackers;
     numBlacklistedTrackers = blacklists;
+    this.numExcludedNodes = numDecommissionedNodes;
     this.ttExpiryInterval = ttExpiryInterval;
     map_tasks = maps;
     reduce_tasks = reduces;
@@ -134,8 +146,19 @@ public class ClusterStatus implements Wr
       long ttExpiryInterval,
       int maps, int reduces, int maxMaps, int maxReduces, 
       JobTracker.State state) {
+    this(activeTrackers, blacklistedTrackers, ttExpiryInterval, maps, reduces, 
+         maxMaps, maxReduces, state, 0);
+  }
+
+  /**
+   * @param numDecommissionNodes number of decommission trackers
+   */
+  ClusterStatus(Collection<String> activeTrackers, 
+                Collection<String> blacklistedTrackers, long ttExpiryInterval,
+                int maps, int reduces, int maxMaps, int maxReduces, 
+                JobTracker.State state, int numDecommissionNodes) {
     this(activeTrackers.size(), blacklistedTrackers.size(), ttExpiryInterval, 
-        maps, reduces, maxMaps, maxReduces, state);
+        maps, reduces, maxMaps, maxReduces, state, numDecommissionNodes);
     this.activeTrackers = activeTrackers;
     this.blacklistedTrackers = blacklistedTrackers;
   }
@@ -178,6 +201,14 @@ public class ClusterStatus implements Wr
   }
   
   /**
+   * Get the number of excluded hosts in the cluster.
+   * @return the number of excluded hosts in the cluster.
+   */
+  public int getNumExcludedNodes() {
+    return numExcludedNodes;
+  }
+  
+  /**
    * Get the tasktracker expiry interval for the cluster
    * @return the expiry interval in msec
    */
@@ -270,6 +301,7 @@ public class ClusterStatus implements Wr
         Text.writeString(out, tracker);
       }
     }
+    out.writeInt(numExcludedNodes);
     out.writeLong(ttExpiryInterval);
     out.writeInt(map_tasks);
     out.writeInt(reduce_tasks);
@@ -297,6 +329,7 @@ public class ClusterStatus implements Wr
         blacklistedTrackers.add(name);
       }
     }
+    numExcludedNodes = in.readInt();
     ttExpiryInterval = in.readLong();
     map_tasks = in.readInt();
     reduce_tasks = in.readInt();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:24:12 2011
@@ -50,6 +50,8 @@ import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -77,7 +79,9 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.PermissionChecker;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
@@ -377,6 +381,10 @@ public class JobTracker implements MRCon
                         faultyTrackers.numBlacklistedTrackers -= 1;
                       }
                       updateTaskTrackerStatus(trackerName, null);
+                      
+                      // remove the mapping from the hosts list
+                      String hostname = newProfile.getHost();
+                      hostnameToTrackerName.get(hostname).remove(trackerName);
                     } else {
                       // Update time by inserting latest profile
                       trackerExpiryQueue.add(newProfile);
@@ -1508,6 +1516,12 @@ public class JobTracker implements MRCon
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
     new TreeMap<TaskAttemptID, TaskInProgress>();
+  // (hostname --> Set(trackername))
+  // This is used to keep track of all trackers running on one host. While
+  // decommissioning the host, all the trackers on the host will be lost.
+  Map<String, Set<String>> hostnameToTrackerName = 
+    Collections.synchronizedMap(new TreeMap<String, Set<String>>());
+  
 
   // (taskid --> trackerID) 
   TreeMap<TaskAttemptID, String> taskidToTrackerMap = new TreeMap<TaskAttemptID, String>();
@@ -1594,6 +1608,8 @@ public class JobTracker implements MRCon
   FileSystem fs = null;
   Path systemDir = null;
   private JobConf conf;
+  private final UserGroupInformation mrOwner;
+  private final String supergroup;
 
   long limitMaxMemForMapTasks;
   long limitMaxMemForReduceTasks;
@@ -1611,6 +1627,16 @@ public class JobTracker implements MRCon
   
   JobTracker(JobConf conf, String identifier) 
   throws IOException, InterruptedException {   
+    // find the owner of the process
+    try {
+      mrOwner = UnixUserGroupInformation.login(conf);
+    } catch (LoginException e) {
+      throw new IOException(StringUtils.stringifyException(e));
+    }
+    supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+    LOG.info("Starting jobtracker with owner as " + mrOwner.getUserName() 
+             + " and supergroup as " + supergroup);
+
     //
     // Grab some static constants
     //
@@ -2457,11 +2483,23 @@ public class JobTracker implements MRCon
    */
   private void addNewTracker(TaskTrackerStatus status) {
     trackerExpiryQueue.add(status);
+
     //  Register the tracker if its not registered
+    String hostname = status.getHost();
     if (getNode(status.getTrackerName()) == null) {
       // Making the network location resolution inline .. 
-      resolveAndAddToTopology(status.getHost());
+      resolveAndAddToTopology(hostname);
     }
+
+    // add it to the set of tracker per host
+    Set<String> trackers = hostnameToTrackerName.get(hostname);
+    if (trackers == null) {
+      trackers = Collections.synchronizedSet(new HashSet<String>());
+      hostnameToTrackerName.put(hostname, trackers);
+    }
+    LOG.info("Adding tracker " + status.getTrackerName() + " to host " 
+             + hostname);
+    trackers.add(status.getTrackerName());
   }
 
   public Node resolveAndAddToTopology(String name) {
@@ -3241,7 +3279,8 @@ public class JobTracker implements MRCon
             totalReduces,
             totalMapTaskCapacity,
             totalReduceTaskCapacity, 
-            state);
+            state, getExcludedNodes().size()
+            );
       } else {
         return new ClusterStatus(taskTrackers.size() - 
             getBlacklistedTrackerCount(),
@@ -3251,7 +3290,7 @@ public class JobTracker implements MRCon
             totalReduces,
             totalMapTaskCapacity,
             totalReduceTaskCapacity, 
-            state);          
+            state, getExcludedNodes().size());          
       }
     }
   }
@@ -3829,6 +3868,64 @@ public class JobTracker implements MRCon
     }
   }
   
+  /**
+   * Rereads the config to get hosts and exclude list file names.
+   * Rereads the files to update the hosts and exclude lists.
+   */
+  public synchronized void refreshNodes() throws IOException {
+    // check access
+    PermissionChecker.checkSuperuserPrivilege(mrOwner, supergroup);
+
+    // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
+    // Update the file names and refresh internal includes and excludes list
+    LOG.info("Refreshing hosts information");
+    Configuration conf = new Configuration();
+
+    hostsReader.updateFileNames(conf.get("mapred.hosts",""), 
+                                conf.get("mapred.hosts.exclude", ""));
+    hostsReader.refresh();
+    
+    Set<String> excludeSet = new HashSet<String>();
+    for(Map.Entry<String, TaskTrackerStatus> eSet : taskTrackers.entrySet()) {
+      String trackerName = eSet.getKey();
+      TaskTrackerStatus status = eSet.getValue();
+      // Check if not include i.e not in host list or in hosts list but excluded
+      if (!inHostsList(status) || inExcludedHostsList(status)) {
+          excludeSet.add(status.getHost()); // add to rejected trackers
+      }
+    }
+    decommissionNodes(excludeSet);
+  }
+
+  // main decommission
+  private synchronized void decommissionNodes(Set<String> hosts) 
+  throws IOException {  
+    LOG.info("Decommissioning " + hosts.size() + " nodes");
+    // create a list of tracker hostnames
+    synchronized (taskTrackers) {
+      synchronized (trackerExpiryQueue) {
+        for (String host : hosts) {
+          LOG.info("Decommissioning host " + host);
+          Set<String> trackers = hostnameToTrackerName.remove(host);
+          if (trackers != null) {
+            for (String tracker : trackers) {
+              LOG.info("Losing tracker " + tracker + " on host " + host);
+              lostTaskTracker(tracker); // lose the tracker
+              updateTaskTrackerStatus(tracker, null);
+            }
+          }
+          LOG.info("Host " + host + " is ready for decommissioning");
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns a set of excluded nodes.
+   */
+  Collection<String> getExcludedNodes() {
+    return hostsReader.getExcludedHosts();
+  }
 
   /**
    * Get the localized job file path on the job trackers local file system

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java Fri Mar  4 03:24:12 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.AdminOpe
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -53,7 +54,8 @@ public class MRAdmin extends Configured 
   private static void printHelp(String cmd) {
     String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
     "The full syntax is: \n\n" +
-    "hadoop mradmin [-refreshServiceAcl] [-refreshQueueAcls] [-help [cmd]]\n"; 
+    "hadoop mradmin [-refreshServiceAcl] [-refreshQueueAcls] [-help [cmd]] "
+    + "[-refreshNodes]\n"; 
 
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
     "\t\tJobtracker will reload the authorization policy file.\n";
@@ -62,6 +64,9 @@ public class MRAdmin extends Configured 
         "-refreshQueueAcls: Reload the queue acls\n"
             + "\t\tJobTracker will reload the mapred-queue-acls.xml file.\n";
 
+  String refreshNodes =
+    "-refreshNodes: Refresh the hosts information at the jobtracker.\n";
+  
   String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
     "\t\tis specified.\n";
 
@@ -69,12 +74,15 @@ public class MRAdmin extends Configured 
     System.out.println(refreshServiceAcl);
   } else if ("refreshQueueAcls".equals(cmd)) {
     System.out.println(refreshQueueAcls);
+  }  else if ("refreshNodes".equals(cmd)) {
+    System.out.println(refreshNodes);
   } else if ("help".equals(cmd)) {
     System.out.println(help);
   } else {
     System.out.println(summary);
     System.out.println(refreshServiceAcl);
     System.out.println(refreshQueueAcls);
+    System.out.println(refreshNodes);
     System.out.println(help);
     System.out.println();
     ToolRunner.printGenericCommandUsage(System.out);
@@ -91,10 +99,13 @@ public class MRAdmin extends Configured 
       System.err.println("Usage: java MRAdmin" + " [-refreshServiceAcl]");
     } else if ("-refreshQueueAcls".equals(cmd)) {
       System.err.println("Usage: java MRAdmin" + " [-refreshQueueAcls]");
+    } else if ("-refreshNodes".equals(cmd)) {
+      System.err.println("Usage: java MRAdmin" + " [-refreshNodes]");
     } else {
       System.err.println("Usage: java MRAdmin");
       System.err.println("           [-refreshServiceAcl]");
       System.err.println("           [-refreshQueueAcls]");
+      System.err.println("           [-refreshNodes]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -151,6 +162,31 @@ public class MRAdmin extends Configured 
     return 0;
   }
 
+  /**
+   * Command to ask the jobtracker to reread the hosts and excluded hosts 
+   * file.
+   * Usage: java MRAdmin -refreshNodes
+   * @exception IOException 
+   */
+  private int refreshNodes() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    AdminOperationsProtocol adminOperationsProtocol = 
+      (AdminOperationsProtocol) 
+      RPC.getProxy(AdminOperationsProtocol.class, 
+                   AdminOperationsProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             AdminOperationsProtocol.class));
+    
+    // Refresh the queue properties
+    adminOperationsProtocol.refreshNodes();
+    
+    return 0;
+  }
+  
   @Override
   public int run(String[] args) throws Exception {
     if (args.length < 1) {
@@ -165,7 +201,8 @@ public class MRAdmin extends Configured 
     //
     // verify that we have enough command line parameters
     //
-    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueueAcls".equals(cmd)) {
+    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueueAcls".equals(cmd)
+        || "-refreshNodes".equals(cmd)) {
       if (args.length != 1) {
         printUsage(cmd);
         return exitCode;
@@ -178,6 +215,8 @@ public class MRAdmin extends Configured 
         exitCode = refreshAuthorizationPolicy();
       } else if ("-refreshQueueAcls".equals(cmd)) {
         exitCode = refreshQueueAcls();
+      } else if ("-refreshNodes".equals(cmd)) {
+        exitCode = refreshNodes();
       } else if ("-help".equals(cmd)) {
         if (i < args.length) {
           printUsage(args[i]);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/mapred-site.xml Fri Mar  4 03:24:12 2011
@@ -9,5 +9,10 @@
   <name>io.sort.mb</name>
   <value>10</value>
 </property>
+<property>
+  <name>mapred.hosts.exclude</name>
+  <value>hosts.exclude</value>
+  <description></description>
+</property>
 
 </configuration>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Fri Mar  4 03:24:12 2011
@@ -55,11 +55,11 @@ public class TestFsck extends TestCase {
     ByteArrayOutputStream bStream = new ByteArrayOutputStream();
     PrintStream newOut = new PrintStream(bStream, true);
     System.setOut(newOut);
-    ((Log4JLogger)PermissionChecker.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.ALL);
     int errCode = ToolRunner.run(new DFSck(conf), path);
     if (checkErrorCode)
       assertEquals(expectedErrCode, errCode);
-    ((Log4JLogger)PermissionChecker.LOG).getLogger().setLevel(Level.INFO);
+    ((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.INFO);
     System.setOut(oldOut);
     return bStream.toString();
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar  4 03:24:12 2011
@@ -55,6 +55,7 @@ public class MiniMRCluster {
   private String namenode;
   private UnixUserGroupInformation ugi = null;
   private JobConf conf;
+  private int numTrackerToExclude;
     
   private JobConf job;
   
@@ -266,13 +267,16 @@ public class MiniMRCluster {
     JobClient client;
     try {
       client = new JobClient(job);
-      while(client.getClusterStatus().getTaskTrackers()<taskTrackerList.size()) {
+      ClusterStatus status = client.getClusterStatus();
+      while(status.getTaskTrackers() + numTrackerToExclude 
+            < taskTrackerList.size()) {
         for(TaskTrackerRunner runner : taskTrackerList) {
           if(runner.isDead) {
             throw new RuntimeException("TaskTracker is dead");
           }
         }
         Thread.sleep(1000);
+        status = client.getClusterStatus();
       }
     }
     catch (IOException ex) {
@@ -420,6 +424,14 @@ public class MiniMRCluster {
       int numTaskTrackers, String namenode, 
       int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
       JobConf conf) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, 
+         racks, hosts, ugi, conf, 0);
+  }
+  
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, 
+      int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
+      JobConf conf, int numTrackerToExclude) throws IOException {
     if (racks != null && racks.length < numTaskTrackers) {
       LOG.error("Invalid number of racks specified. It should be at least " +
           "equal to the number of tasktrackers");
@@ -454,6 +466,7 @@ public class MiniMRCluster {
     this.namenode = namenode;
     this.ugi = ugi;
     this.conf = conf; // this is the conf the mr starts with
+    this.numTrackerToExclude = numTrackerToExclude;
 
     // start the jobtracker
     startJobTracker();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Fri Mar  4 03:24:12 2011
@@ -32,16 +32,14 @@ import org.apache.hadoop.security.*;
  * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
  */
 public class TestMiniMRWithDFSWithDistinctUsers extends TestCase {
-  static final long now = System.currentTimeMillis();
   static final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); 
   static final UnixUserGroupInformation PI_UGI = createUGI("pi", false); 
   static final UnixUserGroupInformation WC_UGI = createUGI("wc", false); 
 
   static UnixUserGroupInformation createUGI(String name, boolean issuper) {
-    String username = name + now;
-    String group = issuper? "supergroup": username;
+    String group = issuper? "supergroup": name;
     return UnixUserGroupInformation.createImmutable(
-        new String[]{username, group});
+        new String[]{name, group});
   }
   
   static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=1076939&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java Fri Mar  4 03:24:12 2011
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.HashSet;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * Test node decommissioning and recommissioning via refresh. Also check if the 
+ * nodes are decommissioned upon refresh. 
+ */
+public class TestNodeRefresh extends TestCase {
+  private String namenode = null;
+  private MiniDFSCluster dfs = null;
+  private MiniMRCluster mr = null;
+  private JobTracker jt = null;
+  private String[] hosts = null;
+  private String[] trackerHosts = null;
+  public static final Log LOG = 
+    LogFactory.getLog(TestNodeRefresh.class);
+  
+  private String getHostname(int i) {
+    return "host" + i + ".com";
+  }
+
+  private void startCluster(int numHosts, int numTrackerPerHost, 
+                            int numExcluded, Configuration conf) 
+  throws IOException {
+    try {
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      
+      // prepare hosts info
+      hosts = new String[numHosts];
+      for (int i = 1; i <= numHosts; ++i) {
+        hosts[i - 1] = getHostname(i);
+      }
+      
+      // start dfs
+      dfs = new MiniDFSCluster(conf, 1, true, null, hosts);
+      dfs.waitActive();
+      dfs.startDataNodes(conf, numHosts, true, null, null, hosts, null);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
+      (dfs.getFileSystem()).getUri().getPort(); 
+      
+      // create tracker hosts
+      trackerHosts = new String[numHosts * numTrackerPerHost];
+      for (int i = 1; i <= (numHosts * numTrackerPerHost); ++i) {
+        trackerHosts[i - 1] = getHostname(i);
+      }
+      
+      // start mini mr
+      JobConf jtConf = new JobConf(conf);
+      mr = new MiniMRCluster(0, 0, numHosts * numTrackerPerHost, namenode, 1, 
+                             null, trackerHosts, null, jtConf, 
+                             numExcluded * numTrackerPerHost);
+      
+      jt = mr.getJobTrackerRunner().getJobTracker();
+      
+      // check if trackers from all the desired hosts have connected
+      Set<String> hostsSeen = new HashSet<String>();
+      for (TaskTrackerStatus status : jt.taskTrackers()) {
+        hostsSeen.add(status.getHost());
+      }
+      assertEquals("Not all hosts are up", numHosts - numExcluded, 
+                   hostsSeen.size());
+    } catch (IOException ioe) {
+      stopCluster();
+    }
+  }
+
+  private void stopCluster() {
+    hosts = null;
+    trackerHosts = null;
+    if (dfs != null) { 
+      dfs.shutdown();
+      dfs = null;
+      namenode = null;
+    }
+    if (mr != null) { 
+      mr.shutdown();
+      mr = null;
+      jt = null;
+    }
+  }
+
+  private AdminOperationsProtocol getClient(Configuration conf, 
+                                                   UserGroupInformation ugi) 
+  throws IOException {
+    return (AdminOperationsProtocol)RPC.getProxy(AdminOperationsProtocol.class,
+        AdminOperationsProtocol.versionID, JobTracker.getAddress(conf), ugi, 
+        conf, NetUtils.getSocketFactory(conf, AdminOperationsProtocol.class));
+  }
+
+  /**
+   * Check default value of mapred.hosts.exclude. Also check if only 
+   * owner/supergroup user is allowed to this command.
+   */
+  public void testMRRefreshDefault() throws IOException {  
+    // start a cluster with 2 hosts and no exclude-hosts file
+    Configuration conf = new Configuration();
+    conf.set("mapred.hosts.exclude", "");
+    startCluster(2, 1, 0, conf);
+
+    conf = mr.createJobConf(new JobConf(conf));
+
+    // refresh with wrong user
+    UserGroupInformation ugi_wrong =
+      TestMiniMRWithDFSWithDistinctUsers.createUGI("user1", false);
+    AdminOperationsProtocol client = getClient(conf, ugi_wrong);
+    boolean success = false;
+    try {
+      // Also try tool runner
+      client.refreshNodes();
+      success = true;
+    } catch (IOException ioe) {}
+    assertFalse("Invalid user performed privileged refresh operation", success);
+
+    // refresh with correct user
+    success = false;
+    String owner = ShellCommandExecutor.execCommand("whoami").trim();
+    UserGroupInformation ugi_correct =
+      TestMiniMRWithDFSWithDistinctUsers.createUGI(owner, false);
+    client = getClient(conf, ugi_correct);
+    try {
+      client.refreshNodes();
+      success = true;
+    } catch (IOException ioe){}
+    assertTrue("Privileged user denied permission for refresh operation",
+               success);
+
+    // refresh with super user
+    success = false;
+    UserGroupInformation ugi_super =
+      TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", true);
+    client = getClient(conf, ugi_super);
+    try {
+      client.refreshNodes();
+      success = true;
+    } catch (IOException ioe){}
+    assertTrue("Super user denied permission for refresh operation",
+               success);
+
+    // check the cluster status and tracker size
+    assertEquals("Trackers are lost upon refresh with empty hosts.exclude",
+                 2, jt.getClusterStatus(false).getTaskTrackers());
+    assertEquals("Excluded node count is incorrect",
+                 0, jt.getClusterStatus(false).getNumExcludedNodes());
+
+    // check if the host is disallowed
+    Set<String> hosts = new HashSet<String>();
+    for (TaskTrackerStatus status : jt.taskTrackers()) {
+      hosts.add(status.getHost());
+    }
+    assertEquals("Host is excluded upon refresh with empty hosts.exclude",
+                 2, hosts.size());
+
+    stopCluster();
+  }
+
+  /**
+   * Check refresh with a specific user is set in the conf along with supergroup
+   */
+  public void testMRSuperUsers() throws IOException {  
+    // start a cluster with 1 host and specified superuser and supergroup
+    UnixUserGroupInformation ugi =
+      TestMiniMRWithDFSWithDistinctUsers.createUGI("user1", false);
+    Configuration conf = new Configuration();
+    UnixUserGroupInformation.saveToConf(conf, 
+        UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+    // set the supergroup
+    conf.set("mapred.permissions.supergroup", "abc");
+    startCluster(2, 1, 0, conf);
+
+    conf = mr.createJobConf(new JobConf(conf));
+
+    // refresh with wrong user
+    UserGroupInformation ugi_wrong =
+      TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", false);
+    AdminOperationsProtocol client = getClient(conf, ugi_wrong);
+    boolean success = false;
+    try {
+      // Also try tool runner
+      client.refreshNodes();
+      success = true;
+    } catch (IOException ioe) {}
+    assertFalse("Invalid user performed privileged refresh operation", success);
+
+    // refresh with correct user
+    success = false;
+    client = getClient(conf, ugi);
+    try {
+      client.refreshNodes();
+      success = true;
+    } catch (IOException ioe){}
+    assertTrue("Privileged user denied permission for refresh operation",
+               success);
+
+    // refresh with super user
+    success = false;
+    UserGroupInformation ugi_super =
+      UnixUserGroupInformation.createImmutable(new String[]{"user3", "abc"});
+    client = getClient(conf, ugi_super);
+    try {
+      client.refreshNodes();
+      success = true;
+    } catch (IOException ioe){}
+    assertTrue("Super user denied permission for refresh operation",
+               success);
+
+    stopCluster();
+  }
+
+  /**
+   * Check node refresh for decommissioning. Check if an allowed host is 
+   * disallowed upon refresh. Also check if only owner/supergroup user is 
+   * allowed to fire this command.
+   */
+  public void testMRRefreshDecommissioning() throws IOException {
+    // start a cluster with 2 hosts and empty exclude-hosts file
+    Configuration conf = new Configuration();
+    File file = new File("hosts.exclude");
+    file.delete();
+    startCluster(2, 1, 0, conf);
+    String hostToDecommission = getHostname(1);
+    conf = mr.createJobConf(new JobConf(conf));
+
+    // change the exclude-hosts file to include one host
+    FileOutputStream out = new FileOutputStream(file);
+    LOG.info("Writing excluded nodes to log file " + file.toString());
+    BufferedWriter writer = null;
+    try {
+      writer = new BufferedWriter(new OutputStreamWriter(out));
+      writer.write( hostToDecommission + "\n"); // decommission first host
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+      out.close();
+    }
+    file.deleteOnExit();
+
+    String owner = ShellCommandExecutor.execCommand("whoami").trim();
+    UserGroupInformation ugi_correct =
+      TestMiniMRWithDFSWithDistinctUsers.createUGI(owner, false);
+    AdminOperationsProtocol client = getClient(conf, ugi_correct);
+    try {
+      client.refreshNodes();
+    } catch (IOException ioe){}
+ 
+    // check the cluster status and tracker size
+    assertEquals("Tracker is not lost upon host decommissioning", 
+                 1, jt.getClusterStatus(false).getTaskTrackers());
+    assertEquals("Excluded node count is incorrect", 
+                 1, jt.getClusterStatus(false).getNumExcludedNodes());
+    
+    // check if the host is disallowed
+    for (TaskTrackerStatus status : jt.taskTrackers()) {
+      assertFalse("Tracker from decommissioned host still exist", 
+                  status.getHost().equals(hostToDecommission));
+    }
+    
+    stopCluster();
+  }
+
+  /**
+   * Check node refresh for recommissioning. Check if an disallowed host is 
+   * allowed upon refresh.
+   */
+  public void testMRRefreshRecommissioning() throws IOException {
+    String hostToInclude = getHostname(1);
+
+    // start a cluster with 2 hosts and exclude-hosts file having one hostname
+    Configuration conf = new Configuration();
+    
+    // create a exclude-hosts file to include one host
+    File file = new File("hosts.exclude");
+    file.delete();
+    FileOutputStream out = new FileOutputStream(file);
+    LOG.info("Writing excluded nodes to log file " + file.toString());
+    BufferedWriter writer = null;
+    try {
+      writer = new BufferedWriter(new OutputStreamWriter(out));
+      writer.write(hostToInclude + "\n"); // exclude first host
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+      out.close();
+    }
+    
+    startCluster(2, 1, 1, conf);
+    
+    file.delete();
+
+    // change the exclude-hosts file to include no hosts
+    // note that this will also test hosts file with no content
+    out = new FileOutputStream(file);
+    LOG.info("Clearing hosts.exclude file " + file.toString());
+    writer = null;
+    try {
+      writer = new BufferedWriter(new OutputStreamWriter(out));
+      writer.write("\n");
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+      out.close();
+    }
+    file.deleteOnExit();
+    
+    conf = mr.createJobConf(new JobConf(conf));
+
+    String owner = ShellCommandExecutor.execCommand("whoami").trim();
+    UserGroupInformation ugi_correct =  
+      TestMiniMRWithDFSWithDistinctUsers.createUGI(owner, false);
+    AdminOperationsProtocol client = getClient(conf, ugi_correct);
+    try {
+      client.refreshNodes();
+    } catch (IOException ioe){}
+
+    // start a tracker
+    mr.startTaskTracker(hostToInclude, null, 2, 1);
+
+    // wait for the tracker to join the jt
+    while  (jt.taskTrackers().size() < 2) {
+      UtilsForTests.waitFor(100);
+    }
+
+    assertEquals("Excluded node count is incorrect", 
+                 0, jt.getClusterStatus(false).getNumExcludedNodes());
+    
+    // check if the host is disallowed
+    boolean seen = false;
+    for (TaskTrackerStatus status : jt.taskTrackers()) {
+      if(status.getHost().equals(hostToInclude)) {
+        seen = true;
+        break;
+      }
+    }
+    assertTrue("Tracker from excluded host doesnt exist", seen);
+    
+    stopCluster();
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp?rev=1076939&r1=1076938&r2=1076939&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp Fri Mar  4 03:24:12 2011
@@ -32,7 +32,8 @@
               "<th>Total Submissions</th>" +
               "<th>Nodes</th><th>Map Task Capacity</th>" +
               "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" + 
-              "<th>Blacklisted Nodes</th></tr>\n");
+              "<th>Blacklisted Nodes</th>" +
+              "<th>Excluded Nodes</th></tr>\n");
     out.print("<tr><td>" + status.getMapTasks() + "</td><td>" +
               status.getReduceTasks() + "</td><td>" + 
               tracker.getTotalSubmissions() +
@@ -43,6 +44,8 @@
               "</td><td>" + tasksPerNode +
               "</td><td><a href=\"machines.jsp?type=blacklisted\">" +
               status.getBlacklistedTrackers() + "</a>" +
+              "</td><td><a href=\"machines.jsp?type=excluded\">" +
+              status.getNumExcludedNodes() + "</a>" +
               "</td></tr></table>\n");
 
     out.print("<br>");



Mime
View raw message