Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4C8D4E2CF for ; Fri, 22 Feb 2013 15:57:56 +0000 (UTC) Received: (qmail 77744 invoked by uid 500); 22 Feb 2013 15:57:55 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 77397 invoked by uid 500); 22 Feb 2013 15:57:50 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 77346 invoked by uid 99); 22 Feb 2013 15:57:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Feb 2013 15:57:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Feb 2013 15:57:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id CE0782388900; Fri, 22 Feb 2013 15:57:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1449083 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Date: Fri, 22 Feb 2013 15:57:21 -0000 To: hdfs-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130222155721.CE0782388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Fri Feb 22 15:57:21 2013 New Revision: 1449083 URL: http://svn.apache.org/r1449083 Log: HDFS-4222. Merge r1448801 from trunk Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1449083&r1=1449082&r2=1449083&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Feb 22 15:57:21 2013 @@ -9,6 +9,8 @@ Release 0.23.7 - UNRELEASED via hairong) IMPROVEMENTS + HDFS-4222. NN is unresponsive and loses heartbeats from DNs when + configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh) OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1449083&r1=1449082&r2=1449083&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Feb 22 15:57:21 2013 @@ -244,6 +244,7 @@ public class FSNamesystem implements Nam static int BLOCK_DELETION_INCREMENT = 1000; private boolean isPermissionEnabled; private UserGroupInformation fsOwner; + private String fsOwnerShortUserName; private String supergroup; private PermissionStatus defaultPermission; @@ -475,6 +476,7 @@ public class FSNamesystem implements Nam private void setConfigurationParameters(Configuration conf) throws IOException { fsOwner = UserGroupInformation.getCurrentUser(); + fsOwnerShortUserName = fsOwner.getShortUserName(); LOG.info("fsOwner=" + fsOwner); @@ -578,9 +580,9 @@ public class FSNamesystem implements Nam * Dump all metadata into specified file */ void metaSave(String filename) throws IOException { + checkSuperuserPrivilege(); writeLock(); try { - checkSuperuserPrivilege(); File file = new File(System.getProperty("hadoop.log.dir"), filename); PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file, true))); @@ -628,12 +630,13 @@ public class FSNamesystem implements Nam throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { if (isInSafeMode()) { throw new SafeModeException("Cannot set permission for " + src, safeMode); } - checkOwner(src); + checkOwner(pc, src); dir.setPermission(src, permission); if (auditLog.isInfoEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(src, false); @@ -657,19 +660,19 @@ public class FSNamesystem implements Nam throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { if (isInSafeMode()) { throw new SafeModeException("Cannot set owner for " + src, safeMode); } - FSPermissionChecker pc = checkOwner(src); - if (!pc.isSuper) { - if (username != null && !pc.user.equals(username)) { - throw new AccessControlException("Non-super user cannot change owner."); + checkOwner(pc, src); + if (!pc.isSuperUser()) { + if (username != null && !pc.getUser().equals(username)) { + throw new AccessControlException("Non-super user cannot change owner"); } if (group != null && !pc.containsGroup(group)) { - throw new AccessControlException("User does not belong to " + group - + " ."); + throw new AccessControlException("User does not belong to " + group); } } dir.setOwner(src, username, group); @@ -710,8 +713,9 @@ public class FSNamesystem implements Nam LocatedBlocks getBlockLocations(String src, long offset, long length, boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException, UnresolvedLinkException, IOException { + FSPermissionChecker pc = getPermissionChecker(); if (isPermissionEnabled) { - checkPathAccess(src, FsAction.READ); + checkPathAccess(pc, src, FsAction.READ); } if (offset < 0) { @@ -821,12 +825,13 @@ public class FSNamesystem implements Nam } HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { if (isInSafeMode()) { throw new SafeModeException("Cannot concat " + target, safeMode); } - concatInternal(target, srcs); + concatInternal(pc, target, srcs); if (auditLog.isInfoEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(target, false); } @@ -842,18 +847,18 @@ public class FSNamesystem implements Nam } /** See {@link #concat(String, String[])} */ - private void concatInternal(String target, String [] srcs) + private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) throws IOException, UnresolvedLinkException { assert hasWriteLock(); // write permission for the target if (isPermissionEnabled) { - checkPathAccess(target, FsAction.WRITE); + checkPathAccess(pc, target, FsAction.WRITE); // and srcs for(String aSrc: srcs) { - checkPathAccess(aSrc, FsAction.READ); // read the file - checkParentAccess(aSrc, FsAction.WRITE); // for delete + checkPathAccess(pc, aSrc, FsAction.READ); // read the file + checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete } } @@ -952,11 +957,12 @@ public class FSNamesystem implements Nam throw new IOException("Access time for hdfs is not configured. " + " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter."); } + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { // Write access is required to set access and modification times if (isPermissionEnabled) { - checkPathAccess(src, FsAction.WRITE); + checkPathAccess(pc, src, FsAction.WRITE); } INode inode = dir.getINode(src); if (inode != null) { @@ -982,12 +988,13 @@ public class FSNamesystem implements Nam PermissionStatus dirPerms, boolean createParent) throws IOException, UnresolvedLinkException { HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { if (!createParent) { verifyParentDir(link); } - createSymlinkInternal(target, link, dirPerms, createParent); + createSymlinkInternal(pc, target, link, dirPerms, createParent); if (auditLog.isInfoEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(link, false); } @@ -1005,8 +1012,8 @@ public class FSNamesystem implements Nam /** * Create a symbolic link. */ - private void createSymlinkInternal(String target, String link, - PermissionStatus dirPerms, boolean createParent) + private void createSymlinkInternal(FSPermissionChecker pc, String target, + String link, PermissionStatus dirPerms, boolean createParent) throws IOException, UnresolvedLinkException { assert hasWriteLock(); if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -1024,7 +1031,7 @@ public class FSNamesystem implements Nam +" either because the filename is invalid or the file exists"); } if (isPermissionEnabled) { - checkAncestorAccess(link, FsAction.WRITE); + checkAncestorAccess(pc, link, FsAction.WRITE); } // validate that we have enough inodes. checkFsObjectLimit(); @@ -1049,15 +1056,15 @@ public class FSNamesystem implements Nam boolean setReplication(final String src, final short replication ) throws IOException { blockManager.verifyReplication(src, replication, null); - final boolean isFile; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { if (isInSafeMode()) { throw new SafeModeException("Cannot set replication for " + src, safeMode); } if (isPermissionEnabled) { - checkPathAccess(src, FsAction.WRITE); + checkPathAccess(pc, src, FsAction.WRITE); } final short[] oldReplication = new short[1]; @@ -1081,10 +1088,11 @@ public class FSNamesystem implements Nam long getPreferredBlockSize(String filename) throws IOException, UnresolvedLinkException { + FSPermissionChecker pc = getPermissionChecker(); readLock(); try { if (isPermissionEnabled) { - checkTraverse(filename); + checkTraverse(pc, filename); } return dir.getPreferredBlockSize(filename); } finally { @@ -1123,9 +1131,10 @@ public class FSNamesystem implements Nam short replication, long blockSize) throws AccessControlException, SafeModeException, FileAlreadyExistsException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, IOException { + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { - startFileInternal(src, permissions, holder, clientMachine, flag, + startFileInternal(pc, src, permissions, holder, clientMachine, flag, createParent, replication, blockSize); } finally { writeUnlock(); @@ -1157,7 +1166,7 @@ public class FSNamesystem implements Nam * * @return the last block locations if the block is partial or null otherwise */ - private LocatedBlock startFileInternal(String src, + private LocatedBlock startFileInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize) throws SafeModeException, FileAlreadyExistsException, @@ -1190,9 +1199,9 @@ public class FSNamesystem implements Nam boolean append = flag.contains(CreateFlag.APPEND); if (isPermissionEnabled) { if (append || (overwrite && pathExists)) { - checkPathAccess(src, FsAction.WRITE); + checkPathAccess(pc, src, FsAction.WRITE); } else { - checkAncestorAccess(src, FsAction.WRITE); + checkAncestorAccess(pc, src, FsAction.WRITE); } } @@ -1313,6 +1322,7 @@ public class FSNamesystem implements Nam */ boolean recoverLease(String src, String holder, String clientMachine) throws IOException { + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { if (isInSafeMode()) { @@ -1332,7 +1342,7 @@ public class FSNamesystem implements Nam return true; } if (isPermissionEnabled) { - checkPathAccess(src, FsAction.WRITE); + checkPathAccess(pc, src, FsAction.WRITE); } recoverLeaseInternal(inode, src, holder, clientMachine, true); @@ -1435,9 +1445,10 @@ public class FSNamesystem implements Nam " Please refer to dfs.support.append configuration parameter."); } LocatedBlock lb = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { - lb = startFileInternal(src, null, holder, clientMachine, + lb = startFileInternal(pc, src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), false, blockManager.maxReplication, (long)0); } finally { @@ -1825,9 +1836,10 @@ public class FSNamesystem implements Nam NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst); } + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { - status = renameToInternal(src, dst); + status = renameToInternal(pc, src, dst); if (status && auditLog.isInfoEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(dst, false); } @@ -1845,7 +1857,7 @@ public class FSNamesystem implements Nam /** @deprecated See {@link #renameTo(String, String)} */ @Deprecated - private boolean renameToInternal(String src, String dst) + private boolean renameToInternal(FSPermissionChecker pc, String src, String dst) throws IOException, UnresolvedLinkException { assert hasWriteLock(); if (isInSafeMode()) { @@ -1861,8 +1873,8 @@ public class FSNamesystem implements Nam // of rewriting the dst String actualdst = dir.isDir(dst)? dst + Path.SEPARATOR + new Path(src).getName(): dst; - checkParentAccess(src, FsAction.WRITE); - checkAncestorAccess(actualdst, FsAction.WRITE); + checkParentAccess(pc, src, FsAction.WRITE); + checkAncestorAccess(pc, actualdst, FsAction.WRITE); } if (dir.renameTo(src, dst)) { @@ -1880,9 +1892,10 @@ public class FSNamesystem implements Nam NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - " + src + " to " + dst); } + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { - renameToInternal(src, dst, options); + renameToInternal(pc, src, dst, options); if (auditLog.isInfoEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(dst, false); } @@ -1900,7 +1913,7 @@ public class FSNamesystem implements Nam } } - private void renameToInternal(String src, String dst, + private void renameToInternal(FSPermissionChecker pc, String src, String dst, Options.Rename... options) throws IOException { assert hasWriteLock(); if (isInSafeMode()) { @@ -1910,8 +1923,8 @@ public class FSNamesystem implements Nam throw new InvalidPathException("Invalid name: " + dst); } if (isPermissionEnabled) { - checkParentAccess(src, FsAction.WRITE); - checkAncestorAccess(dst, FsAction.WRITE); + checkParentAccess(pc, src, FsAction.WRITE); + checkAncestorAccess(pc, dst, FsAction.WRITE); } dir.renameTo(src, dst, options); @@ -1938,6 +1951,10 @@ public class FSNamesystem implements Nam return status; } + private FSPermissionChecker getPermissionChecker() + throws AccessControlException { + return new FSPermissionChecker(fsOwnerShortUserName, supergroup); + } /** * Remove a file/directory from the namespace. *

@@ -1954,7 +1971,7 @@ public class FSNamesystem implements Nam throws AccessControlException, SafeModeException, UnresolvedLinkException, IOException { ArrayList collectedBlocks = new ArrayList(); - + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { if (isInSafeMode()) { @@ -1964,7 +1981,7 @@ public class FSNamesystem implements Nam throw new IOException(src + " is non empty"); } if (enforcePermission && isPermissionEnabled) { - checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL); + checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL); } // Unlink the target directory from directory tree if (!dir.delete(src, collectedBlocks)) { @@ -2032,13 +2049,14 @@ public class FSNamesystem implements Nam */ HdfsFileStatus getFileInfo(String src, boolean resolveLink) throws AccessControlException, UnresolvedLinkException { + FSPermissionChecker pc = getPermissionChecker(); readLock(); try { if (!DFSUtil.isValidName(src)) { throw new InvalidPathException("Invalid file name: " + src); } if (isPermissionEnabled) { - checkTraverse(src); + checkTraverse(pc, src); } return dir.getFileInfo(src, resolveLink); } finally { @@ -2055,9 +2073,10 @@ public class FSNamesystem implements Nam if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src); } + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { - status = mkdirsInternal(src, permissions, createParent); + status = mkdirsInternal(pc, src, permissions, createParent); } finally { writeUnlock(); } @@ -2074,7 +2093,7 @@ public class FSNamesystem implements Nam /** * Create all the necessary directories */ - private boolean mkdirsInternal(String src, + private boolean mkdirsInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, boolean createParent) throws IOException, UnresolvedLinkException { assert hasWriteLock(); @@ -2082,7 +2101,7 @@ public class FSNamesystem implements Nam throw new SafeModeException("Cannot create directory " + src, safeMode); } if (isPermissionEnabled) { - checkTraverse(src); + checkTraverse(pc, src); } if (dir.isDir(src)) { // all the users of mkdirs() are used to expect 'true' even if @@ -2093,7 +2112,7 @@ public class FSNamesystem implements Nam throw new InvalidPathException(src); } if (isPermissionEnabled) { - checkAncestorAccess(src, FsAction.WRITE); + checkAncestorAccess(pc, src, FsAction.WRITE); } if (!createParent) { verifyParentDir(src); @@ -2112,10 +2131,12 @@ public class FSNamesystem implements Nam ContentSummary getContentSummary(String src) throws AccessControlException, FileNotFoundException, UnresolvedLinkException { + FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName, + supergroup); readLock(); try { if (isPermissionEnabled) { - checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE); + checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE); } return dir.getContentSummary(src); } finally { @@ -2130,14 +2151,12 @@ public class FSNamesystem implements Nam */ void setQuota(String path, long nsQuota, long dsQuota) throws IOException, UnresolvedLinkException { + checkSuperuserPrivilege(); writeLock(); try { if (isInSafeMode()) { throw new SafeModeException("Cannot set quota on " + path, safeMode); } - if (isPermissionEnabled) { - checkSuperuserPrivilege(); - } dir.setQuota(path, nsQuota, dsQuota); } finally { writeUnlock(); @@ -2491,13 +2510,14 @@ public class FSNamesystem implements Nam boolean needLocation) throws AccessControlException, UnresolvedLinkException, IOException { DirectoryListing dl; + FSPermissionChecker pc = getPermissionChecker(); readLock(); try { if (isPermissionEnabled) { if (dir.isDir(src)) { - checkPathAccess(src, FsAction.READ_EXECUTE); + checkPathAccess(pc, src, FsAction.READ_EXECUTE); } else { - checkTraverse(src); + checkTraverse(pc, src); } } if (auditLog.isInfoEnabled() && isExternalInvocation()) { @@ -2790,9 +2810,9 @@ public class FSNamesystem implements Nam * @throws IOException if */ void saveNamespace() throws AccessControlException, IOException { + checkSuperuserPrivilege(); readLock(); try { - checkSuperuserPrivilege(); if (!isInSafeMode()) { throw new IOException("Safe mode should be turned ON " + "in order to create namespace image."); @@ -2811,9 +2831,9 @@ public class FSNamesystem implements Nam * @throws AccessControlException if superuser privilege is violated. */ boolean restoreFailedStorage(String arg) throws AccessControlException { + checkSuperuserPrivilege(); writeLock(); try { - checkSuperuserPrivilege(); // if it is disabled - enable it and vice versa. if(arg.equals("check")) @@ -2833,9 +2853,9 @@ public class FSNamesystem implements Nam } void finalizeUpgrade() throws IOException { + checkSuperuserPrivilege(); writeLock(); try { - checkSuperuserPrivilege(); getFSImage().finalizeUpgrade(); } finally { writeUnlock(); @@ -3494,6 +3514,7 @@ public class FSNamesystem implements Nam } CheckpointSignature rollEditLog() throws IOException { + checkSuperuserPrivilege(); writeLock(); try { if (isInSafeMode()) { @@ -3561,61 +3582,64 @@ public class FSNamesystem implements Nam return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission); } - private FSPermissionChecker checkOwner(String path - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, true, null, null, null, null); + private void checkOwner(FSPermissionChecker pc, String path) + throws AccessControlException, UnresolvedLinkException { + checkPermission(pc, path, true, null, null, null, null); } - private FSPermissionChecker checkPathAccess(String path, FsAction access - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, false, null, null, access, null); + private void checkPathAccess(FSPermissionChecker pc, + String path, FsAction access) throws AccessControlException, + UnresolvedLinkException { + checkPermission(pc, path, false, null, null, access, null); } - private FSPermissionChecker checkParentAccess(String path, FsAction access - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, false, null, access, null, null); + private void checkParentAccess(FSPermissionChecker pc, + String path, FsAction access) throws AccessControlException, + UnresolvedLinkException { + checkPermission(pc, path, false, null, access, null, null); } - private FSPermissionChecker checkAncestorAccess(String path, FsAction access - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, false, access, null, null, null); + private void checkAncestorAccess(FSPermissionChecker pc, + String path, FsAction access) throws AccessControlException, + UnresolvedLinkException { + checkPermission(pc, path, false, access, null, null, null); } - private FSPermissionChecker checkTraverse(String path - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, false, null, null, null, null); + private void checkTraverse(FSPermissionChecker pc, String path) + throws AccessControlException, UnresolvedLinkException { + checkPermission(pc, path, false, null, null, null, null); } @Override - public void checkSuperuserPrivilege() throws AccessControlException { + public void checkSuperuserPrivilege() + throws AccessControlException { if (isPermissionEnabled) { - FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup); + FSPermissionChecker pc = getPermissionChecker(); + pc.checkSuperuserPrivilege(); } } /** - * Check whether current user have permissions to access the path. - * For more details of the parameters, see - * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}. + * Check whether current user have permissions to access the path. For more + * details of the parameters, see + * {@link FSPermissionChecker#checkPermission()}. */ - private FSPermissionChecker checkPermission(String path, boolean doCheckOwner, - FsAction ancestorAccess, FsAction parentAccess, FsAction access, - FsAction subAccess) throws AccessControlException, UnresolvedLinkException { - FSPermissionChecker pc = new FSPermissionChecker( - fsOwner.getShortUserName(), supergroup); - if (!pc.isSuper) { + private void checkPermission(FSPermissionChecker pc, + String path, boolean doCheckOwner, FsAction ancestorAccess, + FsAction parentAccess, FsAction access, FsAction subAccess) + throws AccessControlException, UnresolvedLinkException { + if (!pc.isSuperUser()) { dir.waitForReady(); readLock(); try { - pc.checkPermission(path, dir.rootDir, doCheckOwner, - ancestorAccess, parentAccess, access, subAccess); + pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess, + parentAccess, access, subAccess); } finally { readUnlock(); - } + } } - return pc; } - + /** * Check to see if we have exceeded the limit on the number * of inodes. @@ -4003,14 +4027,13 @@ public class FSNamesystem implements Nam */ Collection listCorruptFileBlocks(String path, String[] cookieTab) throws IOException { - + checkSuperuserPrivilege(); readLock(); try { if (!isPopulatingReplQueues()) { throw new IOException("Cannot run listCorruptFileBlocks because " + "replication queues have not been initialized."); } - checkSuperuserPrivilege(); // print a limited # of corrupt files per call int count = 0; ArrayList corruptFiles = new ArrayList(); Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1449083&r1=1449082&r2=1449083&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Fri Feb 22 15:57:21 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.Stack; @@ -31,14 +32,20 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; -/** Perform permission checking in {@link FSNamesystem}. */ +/** + * Class that helps in checking file system permission. + * The state of this class need not be synchronized as it has data structures that + * are read-only. + * + * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}. + */ class FSPermissionChecker { static final Log LOG = LogFactory.getLog(UserGroupInformation.class); - private final UserGroupInformation ugi; - public final String user; - private final Set groups = new HashSet(); - public final boolean isSuper; + private final String user; + /** A set with group namess. Not synchronized since it is unmodifiable */ + private final Set groups; + private final boolean isSuper; FSPermissionChecker(String fsOwner, String supergroup ) throws AccessControlException{ @@ -47,10 +54,9 @@ class FSPermissionChecker { } catch (IOException e) { throw new AccessControlException(e); } - - groups.addAll(Arrays.asList(ugi.getGroupNames())); + HashSet s = new HashSet(Arrays.asList(ugi.getGroupNames())); + groups = Collections.unmodifiableSet(s); user = ugi.getShortUserName(); - isSuper = user.equals(fsOwner) || groups.contains(supergroup); } @@ -60,20 +66,23 @@ class FSPermissionChecker { */ public boolean containsGroup(String group) {return groups.contains(group);} + public String getUser() { + return user; + } + + public boolean isSuperUser() { + return isSuper; + } + /** * Verify if the caller has the required permission. This will result into * an exception if the caller is not allowed to access the resource. - * @param owner owner of the system - * @param supergroup supergroup of the system */ - public static void checkSuperuserPrivilege(UserGroupInformation owner, - String supergroup) - throws AccessControlException { - FSPermissionChecker checker = - new FSPermissionChecker(owner.getShortUserName(), supergroup); - if (!checker.isSuper) { + public void checkSuperuserPrivilege() + throws AccessControlException { + if (!isSuper) { throw new AccessControlException("Access denied for user " - + checker.user + ". Superuser privilege is required"); + + user + ". Superuser privilege is required"); } } @@ -103,9 +112,11 @@ class FSPermissionChecker { * @param subAccess If path is a directory, * it is the access required of the path and all the sub-directories. * If path is not a directory, there is no effect. - * @return a PermissionChecker object which caches data for later use. * @throws AccessControlException * @throws UnresolvedLinkException + * + * Guarded by {@link FSNamesystem#readLock()} + * Caller of this method must hold that lock. */ void checkPermission(String path, INodeDirectory root, boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess, FsAction access, @@ -148,6 +159,7 @@ class FSPermissionChecker { } } + /** Guarded by {@link FSNamesystem#readLock()} */ private void checkOwner(INode inode) throws AccessControlException { if (inode != null && user.equals(inode.getUserName())) { return; @@ -155,6 +167,7 @@ class FSPermissionChecker { throw new AccessControlException("Permission denied"); } + /** Guarded by {@link FSNamesystem#readLock()} */ private void checkTraverse(INode[] inodes, int last ) throws AccessControlException { for(int j = 0; j <= last; j++) { @@ -162,6 +175,7 @@ class FSPermissionChecker { } } + /** Guarded by {@link FSNamesystem#readLock()} */ private void checkSubAccess(INode inode, FsAction access ) throws AccessControlException { if (inode == null || !inode.isDirectory()) { @@ -181,11 +195,13 @@ class FSPermissionChecker { } } + /** Guarded by {@link FSNamesystem#readLock()} */ private void check(INode[] inodes, int i, FsAction access ) throws AccessControlException { check(i >= 0? inodes[i]: null, access); } + /** Guarded by {@link FSNamesystem#readLock()} */ private void check(INode inode, FsAction access ) throws AccessControlException { if (inode == null) { @@ -206,7 +222,9 @@ class FSPermissionChecker { + ", access=" + access + ", inode=" + inode); } - private void checkStickyBit(INode parent, INode inode) throws AccessControlException { + /** Guarded by {@link FSNamesystem#readLock()} */ + private void checkStickyBit(INode parent, INode inode) + throws AccessControlException { if(!parent.getFsPermission().getStickyBit()) { return; }