hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1569870 [2/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/...
Date Wed, 19 Feb 2014 18:34:55 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java Wed Feb 19 18:34:52 2014
@@ -30,6 +30,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -38,15 +42,18 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection.FileUnderConstructionEntry;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.AclFeatureProto;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 
 @InterfaceAudience.Private
@@ -54,6 +61,20 @@ public final class FSImageFormatPBINode 
   private final static long USER_GROUP_STRID_MASK = (1 << 24) - 1;
   private final static int USER_STRID_OFFSET = 40;
   private final static int GROUP_STRID_OFFSET = 16;
+
+  private static final int ACL_ENTRY_NAME_MASK = (1 << 24) - 1;
+  private static final int ACL_ENTRY_NAME_OFFSET = 6;
+  private static final int ACL_ENTRY_TYPE_OFFSET = 3;
+  private static final int ACL_ENTRY_SCOPE_OFFSET = 5;
+  private static final int ACL_ENTRY_PERM_MASK = 7;
+  private static final int ACL_ENTRY_TYPE_MASK = 3;
+  private static final int ACL_ENTRY_SCOPE_MASK = 1;
+  private static final FsAction[] FSACTION_VALUES = FsAction.values();
+  private static final AclEntryScope[] ACL_ENTRY_SCOPE_VALUES = AclEntryScope
+      .values();
+  private static final AclEntryType[] ACL_ENTRY_TYPE_VALUES = AclEntryType
+      .values();
+
   private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
 
   public final static class Loader {
@@ -66,13 +87,30 @@ public final class FSImageFormatPBINode 
           new FsPermission(perm));
     }
 
+    public static ImmutableList<AclEntry> loadAclEntries(
+        AclFeatureProto proto, final String[] stringTable) {
+      ImmutableList.Builder<AclEntry> b = ImmutableList.builder();
+      for (int v : proto.getEntriesList()) {
+        int p = v & ACL_ENTRY_PERM_MASK;
+        int t = (v >> ACL_ENTRY_TYPE_OFFSET) & ACL_ENTRY_TYPE_MASK;
+        int s = (v >> ACL_ENTRY_SCOPE_OFFSET) & ACL_ENTRY_SCOPE_MASK;
+        int nid = (v >> ACL_ENTRY_NAME_OFFSET) & ACL_ENTRY_NAME_MASK;
+        String name = stringTable[nid];
+        b.add(new AclEntry.Builder().setName(name)
+            .setPermission(FSACTION_VALUES[p])
+            .setScope(ACL_ENTRY_SCOPE_VALUES[s])
+            .setType(ACL_ENTRY_TYPE_VALUES[t]).build());
+      }
+      return b.build();
+    }
+
     public static INodeDirectory loadINodeDirectory(INodeSection.INode n,
-        final String[] stringTable) {
+        LoaderContext state) {
       assert n.getType() == INodeSection.INode.Type.DIRECTORY;
       INodeSection.INodeDirectory d = n.getDirectory();
 
       final PermissionStatus permissions = loadPermission(d.getPermission(),
-          stringTable);
+          state.getStringTable());
       final INodeDirectory dir = new INodeDirectory(n.getId(), n.getName()
           .toByteArray(), permissions, d.getModificationTime());
 
@@ -80,6 +118,11 @@ public final class FSImageFormatPBINode 
       if (nsQuota >= 0 || dsQuota >= 0) {
         dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
       }
+
+      if (d.hasAcl()) {
+        dir.addAclFeature(new AclFeature(loadAclEntries(d.getAcl(),
+            state.getStringTable())));
+      }
       return dir;
     }
 
@@ -181,7 +224,7 @@ public final class FSImageFormatPBINode 
       case FILE:
         return loadINodeFile(n);
       case DIRECTORY:
-        return loadINodeDirectory(n, parent.getLoaderContext().getStringTable());
+        return loadINodeDirectory(n, parent.getLoaderContext());
       case SYMLINK:
         return loadINodeSymlink(n);
       default:
@@ -195,6 +238,7 @@ public final class FSImageFormatPBINode 
       INodeSection.INodeFile f = n.getFile();
       List<BlockProto> bp = f.getBlocksList();
       short replication = (short) f.getReplication();
+      LoaderContext state = parent.getLoaderContext();
 
       BlockInfo[] blocks = new BlockInfo[bp.size()];
       for (int i = 0, e = bp.size(); i < e; ++i) {
@@ -206,6 +250,12 @@ public final class FSImageFormatPBINode 
       final INodeFile file = new INodeFile(n.getId(),
           n.getName().toByteArray(), permissions, f.getModificationTime(),
           f.getAccessTime(), blocks, replication, f.getPreferredBlockSize());
+
+      if (f.hasAcl()) {
+        file.addAclFeature(new AclFeature(loadAclEntries(f.getAcl(),
+            state.getStringTable())));
+      }
+
       // under-construction information
       if (f.hasFileUC()) {
         INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
@@ -227,13 +277,15 @@ public final class FSImageFormatPBINode 
       INodeSection.INodeSymlink s = n.getSymlink();
       final PermissionStatus permissions = loadPermission(s.getPermission(),
           parent.getLoaderContext().getStringTable());
-      return new INodeSymlink(n.getId(), n.getName().toByteArray(), permissions,
-          0, 0, s.getTarget().toStringUtf8());
+
+      INodeSymlink sym = new INodeSymlink(n.getId(), n.getName().toByteArray(),
+          permissions, 0, 0, s.getTarget().toStringUtf8());
+
+      return sym;
     }
 
     private void loadRootINode(INodeSection.INode p) {
-      INodeDirectory root = loadINodeDirectory(p, parent.getLoaderContext()
-          .getStringTable());
+      INodeDirectory root = loadINodeDirectory(p, parent.getLoaderContext());
       final Quota.Counts q = root.getQuotaCounts();
       final long nsQuota = q.get(Quota.NAMESPACE);
       final long dsQuota = q.get(Quota.DISKSPACE);
@@ -255,27 +307,48 @@ public final class FSImageFormatPBINode 
           | n.getFsPermissionShort();
     }
 
+    private static AclFeatureProto.Builder buildAclEntries(AclFeature f,
+        final SaverContext.DeduplicationMap<String> map) {
+      AclFeatureProto.Builder b = AclFeatureProto.newBuilder();
+      for (AclEntry e : f.getEntries()) {
+        int v = ((map.getId(e.getName()) & ACL_ENTRY_NAME_MASK) << ACL_ENTRY_NAME_OFFSET)
+            | (e.getType().ordinal() << ACL_ENTRY_TYPE_OFFSET)
+            | (e.getScope().ordinal() << ACL_ENTRY_SCOPE_OFFSET)
+            | (e.getPermission().ordinal());
+        b.addEntries(v);
+      }
+      return b;
+    }
+
     public static INodeSection.INodeFile.Builder buildINodeFile(
-        INodeFileAttributes file,
-        final SaverContext.DeduplicationMap<String> stringMap) {
+        INodeFileAttributes file, final SaverContext state) {
       INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
           .setAccessTime(file.getAccessTime())
           .setModificationTime(file.getModificationTime())
-          .setPermission(buildPermissionStatus(file, stringMap))
+          .setPermission(buildPermissionStatus(file, state.getStringMap()))
           .setPreferredBlockSize(file.getPreferredBlockSize())
           .setReplication(file.getFileReplication());
+
+      AclFeature f = file.getAclFeature();
+      if (f != null) {
+        b.setAcl(buildAclEntries(f, state.getStringMap()));
+      }
       return b;
     }
 
     public static INodeSection.INodeDirectory.Builder buildINodeDirectory(
-        INodeDirectoryAttributes dir,
-        final SaverContext.DeduplicationMap<String> stringMap) {
+        INodeDirectoryAttributes dir, final SaverContext state) {
       Quota.Counts quota = dir.getQuotaCounts();
       INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
           .newBuilder().setModificationTime(dir.getModificationTime())
           .setNsQuota(quota.get(Quota.NAMESPACE))
           .setDsQuota(quota.get(Quota.DISKSPACE))
-          .setPermission(buildPermissionStatus(dir, stringMap));
+          .setPermission(buildPermissionStatus(dir, state.getStringMap()));
+
+      AclFeature f = dir.getAclFeature();
+      if (f != null) {
+        b.setAcl(buildAclEntries(f, state.getStringMap()));
+      }
       return b;
     }
 
@@ -376,7 +449,7 @@ public final class FSImageFormatPBINode 
 
     private void save(OutputStream out, INodeDirectory n) throws IOException {
       INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
-          parent.getSaverContext().getStringMap());
+          parent.getSaverContext());
       INodeSection.INode r = buildINodeCommon(n)
           .setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
       r.writeDelimitedTo(out);
@@ -384,7 +457,7 @@ public final class FSImageFormatPBINode 
 
     private void save(OutputStream out, INodeFile n) throws IOException {
       INodeSection.INodeFile.Builder b = buildINodeFile(n,
-          parent.getSaverContext().getStringMap());
+          parent.getSaverContext());
 
       for (Block block : n.getBlocks()) {
         b.addBlocks(PBHelper.convert(block));
@@ -405,10 +478,12 @@ public final class FSImageFormatPBINode 
     }
 
     private void save(OutputStream out, INodeSymlink n) throws IOException {
+      SaverContext state = parent.getSaverContext();
       INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
           .newBuilder()
-          .setPermission(buildPermissionStatus(n, parent.getSaverContext().getStringMap()))
+          .setPermission(buildPermissionStatus(n, state.getStringMap()))
           .setTarget(ByteString.copyFrom(n.getSymlink()));
+
       INodeSection.INode r = buildINodeCommon(n)
           .setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();
       r.writeDelimitedTo(out);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java Wed Feb 19 18:34:52 2014
@@ -116,9 +116,11 @@ public final class FSImageFormatProtobuf
         return map.entrySet();
       }
     }
-    private final DeduplicationMap<String> stringMap = DeduplicationMap.newMap();
     private final ArrayList<INodeReference> refList = Lists.newArrayList();
 
+    private final DeduplicationMap<String> stringMap = DeduplicationMap
+        .newMap();
+
     public DeduplicationMap<String> getStringMap() {
       return stringMap;
     }
@@ -547,6 +549,7 @@ public final class FSImageFormatProtobuf
   public enum SectionName {
     NS_INFO("NS_INFO"),
     STRING_TABLE("STRING_TABLE"),
+    EXTENDED_ACL("EXTENDED_ACL"),
     INODE("INODE"),
     INODE_REFERENCE("INODE_REFERENCE"),
     SNAPSHOT("SNAPSHOT"),

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Feb 19 18:34:52 2014
@@ -142,6 +142,8 @@ import org.apache.hadoop.fs.Options.Rena
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -504,7 +506,9 @@ public class FSNamesystem implements Nam
   private INodeId inodeId;
   
   private final RetryCache retryCache;
-  
+
+  private final AclConfigFlag aclConfigFlag;
+
   /**
    * Set the last allocated inode id when fsimage or editlog is loaded. 
    */
@@ -775,6 +779,7 @@ public class FSNamesystem implements Nam
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
       this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
+      this.aclConfigFlag = new AclConfigFlag(conf);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -7388,6 +7393,123 @@ public class FSNamesystem implements Nam
     return results;
   }
 
+  void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+    aclConfigFlag.checkForApiCall();
+    HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
+      dir.modifyAclEntries(src, aclSpec);
+      resultingStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "modifyAclEntries", src, null, resultingStat);
+  }
+
+  void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+    aclConfigFlag.checkForApiCall();
+    HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
+      dir.removeAclEntries(src, aclSpec);
+      resultingStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "removeAclEntries", src, null, resultingStat);
+  }
+
+  void removeDefaultAcl(String src) throws IOException {
+    aclConfigFlag.checkForApiCall();
+    HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
+      dir.removeDefaultAcl(src);
+      resultingStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "removeDefaultAcl", src, null, resultingStat);
+  }
+
+  void removeAcl(String src) throws IOException {
+    aclConfigFlag.checkForApiCall();
+    HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot remove ACL on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
+      dir.removeAcl(src);
+      resultingStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "removeAcl", src, null, resultingStat);
+  }
+
+  void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
+    aclConfigFlag.checkForApiCall();
+    HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot set ACL on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
+      dir.setAcl(src, aclSpec);
+      resultingStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "setAcl", src, null, resultingStat);
+  }
+
+  AclStatus getAclStatus(String src) throws IOException {
+    aclConfigFlag.checkForApiCall();
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      return dir.getAclStatus(src);
+    } finally {
+      readUnlock();
+    }
+  }
+
   /**
    * Default AuditLogger implementation; used when no access logger is
    * defined in the config file. It can also be explicitly listed in the

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Wed Feb 19 18:34:52 2014
@@ -20,16 +20,21 @@ package org.apache.hadoop.hdfs.server.na
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 /** 
  * Class that helps in checking file system permission.
@@ -42,12 +47,27 @@ class FSPermissionChecker {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
 
   /** @return a string for throwing {@link AccessControlException} */
-  private static String toAccessControlString(INode inode) {
-    return "\"" + inode.getFullPathName() + "\":"
-          + inode.getUserName() + ":" + inode.getGroupName()
-          + ":" + (inode.isDirectory()? "d": "-") + inode.getFsPermission();
+  private String toAccessControlString(INode inode, int snapshotId,
+      FsAction access, FsPermission mode) {
+    return toAccessControlString(inode, snapshotId, access, mode, null);
   }
 
+  /** @return a string for throwing {@link AccessControlException} */
+  private String toAccessControlString(INode inode, int snapshotId,
+      FsAction access, FsPermission mode, List<AclEntry> featureEntries) {
+    StringBuilder sb = new StringBuilder("Permission denied: ")
+      .append("user=").append(user).append(", ")
+      .append("access=").append(access).append(", ")
+      .append("inode=\"").append(inode.getFullPathName()).append("\":")
+      .append(inode.getUserName(snapshotId)).append(':')
+      .append(inode.getGroupName(snapshotId)).append(':')
+      .append(inode.isDirectory() ? 'd' : '-')
+      .append(mode);
+    if (featureEntries != null) {
+      sb.append(':').append(StringUtils.join(",", featureEntries));
+    }
+    return sb.toString();
+  }
 
   private final UserGroupInformation ugi;
   private final String user;  
@@ -219,7 +239,20 @@ class FSPermissionChecker {
       return;
     }
     FsPermission mode = inode.getFsPermission(snapshotId);
+    AclFeature aclFeature = inode.getAclFeature(snapshotId);
+    if (aclFeature != null) {
+      List<AclEntry> featureEntries = aclFeature.getEntries();
+      // It's possible that the inode has a default ACL but no access ACL.
+      if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
+        checkAccessAcl(inode, snapshotId, access, mode, featureEntries);
+        return;
+      }
+    }
+    checkFsPermission(inode, snapshotId, access, mode);
+  }
 
+  private void checkFsPermission(INode inode, int snapshotId, FsAction access,
+      FsPermission mode) throws AccessControlException {
     if (user.equals(inode.getUserName(snapshotId))) { //user class
       if (mode.getUserAction().implies(access)) { return; }
     }
@@ -229,8 +262,88 @@ class FSPermissionChecker {
     else { //other class
       if (mode.getOtherAction().implies(access)) { return; }
     }
-    throw new AccessControlException("Permission denied: user=" + user
-        + ", access=" + access + ", inode=" + toAccessControlString(inode));
+    throw new AccessControlException(
+      toAccessControlString(inode, snapshotId, access, mode));
+  }
+
+  /**
+   * Checks requested access against an Access Control List.  This method relies
+   * on finding the ACL data in the relevant portions of {@link FsPermission} and
+   * {@link AclFeature} as implemented in the logic of {@link AclStorage}.  This
+   * method also relies on receiving the ACL entries in sorted order.  This is
+   * assumed to be true, because the ACL modification methods in
+   * {@link AclTransformation} sort the resulting entries.
+   *
+   * More specifically, this method depends on these invariants in an ACL:
+   * - The list must be sorted.
+   * - Each entry in the list must be unique by scope + type + name.
+   * - There is exactly one each of the unnamed user/group/other entries.
+   * - The mask entry must not have a name.
+   * - The other entry must not have a name.
+   * - Default entries may be present, but they are ignored during enforcement.
+   *
+   * @param inode INode accessed inode
+   * @param snapshotId int snapshot ID
+   * @param access FsAction requested permission
+   * @param mode FsPermission mode from inode
+   * @param featureEntries List<AclEntry> ACL entries from AclFeature of inode
+   * @throws AccessControlException if the ACL denies permission
+   */
+  private void checkAccessAcl(INode inode, int snapshotId, FsAction access,
+      FsPermission mode, List<AclEntry> featureEntries)
+      throws AccessControlException {
+    boolean foundMatch = false;
+
+    // Use owner entry from permission bits if user is owner.
+    if (user.equals(inode.getUserName(snapshotId))) {
+      if (mode.getUserAction().implies(access)) {
+        return;
+      }
+      foundMatch = true;
+    }
+
+    // Check named user and group entries if user was not denied by owner entry.
+    if (!foundMatch) {
+      for (AclEntry entry: featureEntries) {
+        if (entry.getScope() == AclEntryScope.DEFAULT) {
+          break;
+        }
+        AclEntryType type = entry.getType();
+        String name = entry.getName();
+        if (type == AclEntryType.USER) {
+          // Use named user entry with mask from permission bits applied if user
+          // matches name.
+          if (user.equals(name)) {
+            FsAction masked = entry.getPermission().and(mode.getGroupAction());
+            if (masked.implies(access)) {
+              return;
+            }
+            foundMatch = true;
+          }
+        } else if (type == AclEntryType.GROUP) {
+          // Use group entry (unnamed or named) with mask from permission bits
+          // applied if user is a member and entry grants access.  If user is a
+          // member of multiple groups that have entries that grant access, then
+          // it doesn't matter which is chosen, so exit early after first match.
+          String group = name == null ? inode.getGroupName(snapshotId) : name;
+          if (groups.contains(group)) {
+            FsAction masked = entry.getPermission().and(mode.getGroupAction());
+            if (masked.implies(access)) {
+              return;
+            }
+            foundMatch = true;
+          }
+        }
+      }
+    }
+
+    // Use other entry if user was not denied by an earlier match.
+    if (!foundMatch && mode.getOtherAction().implies(access)) {
+      return;
+    }
+
+    throw new AccessControlException(
+      toAccessControlString(inode, snapshotId, access, mode, featureEntries));
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Feb 19 18:34:52 2014
@@ -154,6 +154,31 @@ public abstract class INode implements I
     return nodeToUpdate;
   }
 
+  abstract AclFeature getAclFeature(int snapshotId);
+
+  @Override
+  public final AclFeature getAclFeature() {
+    return getAclFeature(Snapshot.CURRENT_STATE_ID);
+  }
+
+  abstract void addAclFeature(AclFeature aclFeature);
+
+  final INode addAclFeature(AclFeature aclFeature, int latestSnapshotId)
+      throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
+    nodeToUpdate.addAclFeature(aclFeature);
+    return nodeToUpdate;
+  }
+
+  abstract void removeAclFeature();
+
+  final INode removeAclFeature(int latestSnapshotId)
+      throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
+    nodeToUpdate.removeAclFeature();
+    return nodeToUpdate;
+  }
+  
   /**
    * @return if the given snapshot id is {@link Snapshot#CURRENT_STATE_ID},
    *         return this; otherwise return the corresponding snapshot inode.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java Wed Feb 19 18:34:52 2014
@@ -48,6 +48,9 @@ public interface INodeAttributes {
   /** @return the permission information as a long. */
   public long getPermissionLong();
 
+  /** @return the ACL feature. */
+  public AclFeature getAclFeature();
+
   /** @return the modification time. */
   public long getModificationTime();
 
@@ -58,13 +61,15 @@ public interface INodeAttributes {
   public static abstract class SnapshotCopy implements INodeAttributes {
     private final byte[] name;
     private final long permission;
+    private final AclFeature aclFeature;
     private final long modificationTime;
     private final long accessTime;
 
     SnapshotCopy(byte[] name, PermissionStatus permissions,
-        long modificationTime, long accessTime) {
+        AclFeature aclFeature, long modificationTime, long accessTime) {
       this.name = name;
       this.permission = PermissionStatusFormat.toLong(permissions);
+      this.aclFeature = aclFeature;
       this.modificationTime = modificationTime;
       this.accessTime = accessTime;
     }
@@ -72,6 +77,7 @@ public interface INodeAttributes {
     SnapshotCopy(INode inode) {
       this.name = inode.getLocalNameBytes();
       this.permission = inode.getPermissionLong();
+      this.aclFeature = inode.getAclFeature();
       this.modificationTime = inode.getModificationTime();
       this.accessTime = inode.getAccessTime();
     }
@@ -109,6 +115,11 @@ public interface INodeAttributes {
     }
 
     @Override
+    public AclFeature getAclFeature() {
+      return aclFeature;
+    }
+
+    @Override
     public final long getModificationTime() {
       return modificationTime;
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Feb 19 18:34:52 2014
@@ -77,8 +77,11 @@ public class INodeDirectory extends INod
    * @param other The INodeDirectory to be copied
    * @param adopt Indicate whether or not need to set the parent field of child
    *              INodes to the new node
+   * @param featuresToCopy any number of features to copy to the new node.
+   *              The method will do a reference copy, not a deep copy.
    */
-  public INodeDirectory(INodeDirectory other, boolean adopt, boolean copyFeatures) {
+  public INodeDirectory(INodeDirectory other, boolean adopt,
+      Feature... featuresToCopy) {
     super(other);
     this.children = other.children;
     if (adopt && this.children != null) {
@@ -86,9 +89,7 @@ public class INodeDirectory extends INod
         child.setParent(this);
       }
     }
-    if (copyFeatures) {
-      this.features = other.features;
-    }
+    this.features = featuresToCopy;
   }
 
   /** @return true unconditionally. */
@@ -145,12 +146,7 @@ public class INodeDirectory extends INod
    * otherwise, return null.
    */
   public final DirectoryWithQuotaFeature getDirectoryWithQuotaFeature() {
-    for (Feature f : features) {
-      if (f instanceof DirectoryWithQuotaFeature) {
-        return (DirectoryWithQuotaFeature)f;
-      }
-    }
-    return null;
+    return getFeature(DirectoryWithQuotaFeature.class);
   }
 
   /** Is this directory with quota? */
@@ -185,12 +181,7 @@ public class INodeDirectory extends INod
    * otherwise, return null.
    */
   public final DirectoryWithSnapshotFeature getDirectoryWithSnapshotFeature() {
-    for (Feature f : features) {
-      if (f instanceof DirectoryWithSnapshotFeature) {
-        return (DirectoryWithSnapshotFeature) f;
-      }
-    }
-    return null;
+    return getFeature(DirectoryWithSnapshotFeature.class);
   }
 
   /** Is this file has the snapshot feature? */
@@ -231,7 +222,8 @@ public class INodeDirectory extends INod
   public INodeDirectory replaceSelf4INodeDirectory(final INodeMap inodeMap) {
     Preconditions.checkState(getClass() != INodeDirectory.class,
         "the class is already INodeDirectory, this=%s", this);
-    return replaceSelf(new INodeDirectory(this, true, true), inodeMap);
+    return replaceSelf(new INodeDirectory(this, true, this.getFeatures()),
+      inodeMap);
   }
 
   /** Replace itself with the given directory. */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java Wed Feb 19 18:34:52 2014
@@ -35,8 +35,8 @@ public interface INodeDirectoryAttribute
   public static class SnapshotCopy extends INodeAttributes.SnapshotCopy
       implements INodeDirectoryAttributes {
     public SnapshotCopy(byte[] name, PermissionStatus permissions,
-        long modificationTime) {
-      super(name, permissions, modificationTime, 0L);
+        AclFeature aclFeature, long modificationTime) {
+      super(name, permissions, aclFeature, modificationTime, 0L);
     }
 
     public SnapshotCopy(INodeDirectory dir) {
@@ -62,8 +62,9 @@ public interface INodeDirectoryAttribute
 
 
     public CopyWithQuota(byte[] name, PermissionStatus permissions,
-        long modificationTime, long nsQuota, long dsQuota) {
-      super(name, permissions, modificationTime);
+        AclFeature aclFeature, long modificationTime, long nsQuota,
+        long dsQuota) {
+      super(name, permissions, aclFeature, modificationTime);
       this.nsQuota = nsQuota;
       this.dsQuota = dsQuota;
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Feb 19 18:34:52 2014
@@ -151,12 +151,7 @@ public class INodeFile extends INodeWith
    * otherwise, return null.
    */
   public final FileUnderConstructionFeature getFileUnderConstructionFeature() {
-    for (Feature f : features) {
-      if (f instanceof FileUnderConstructionFeature) {
-        return (FileUnderConstructionFeature) f;
-      }
-    }
-    return null;
+    return getFeature(FileUnderConstructionFeature.class);
   }
 
   /** Is this file under construction? */
@@ -265,12 +260,7 @@ public class INodeFile extends INodeWith
    * otherwise, return null.
    */
   public final FileWithSnapshotFeature getFileWithSnapshotFeature() {
-    for (Feature f: features) {
-      if (f instanceof FileWithSnapshotFeature) {
-        return (FileWithSnapshotFeature) f;
-      }
-    }
-    return null;
+    return getFeature(FileWithSnapshotFeature.class);
   }
 
   /** Is this file has the snapshot feature? */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java Wed Feb 19 18:34:52 2014
@@ -41,9 +41,9 @@ public interface INodeFileAttributes ext
     private final long header;
 
     public SnapshotCopy(byte[] name, PermissionStatus permissions,
-        long modificationTime, long accessTime,
+        AclFeature aclFeature, long modificationTime, long accessTime,
         short replication, long preferredBlockSize) {
-      super(name, permissions, modificationTime, accessTime);
+      super(name, permissions, aclFeature, modificationTime, accessTime);
 
       final long h = HeaderFormat.combineReplication(0L, replication);
       header = HeaderFormat.combinePreferredBlockSize(h, preferredBlockSize);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Wed Feb 19 18:34:52 2014
@@ -213,6 +213,22 @@ public abstract class INodeReference ext
   public final FsPermission getFsPermission(int snapshotId) {
     return referred.getFsPermission(snapshotId);
   }
+
+  @Override
+  final AclFeature getAclFeature(int snapshotId) {
+    return referred.getAclFeature(snapshotId);
+  }
+
+  @Override
+  final void addAclFeature(AclFeature aclFeature) {
+    referred.addAclFeature(aclFeature);
+  }
+
+  @Override
+  final void removeAclFeature() {
+    referred.removeAclFeature();
+  }
+
   @Override
   public final short getFsPermissionShort() {
     return referred.getFsPermissionShort();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java Wed Feb 19 18:34:52 2014
@@ -21,6 +21,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.INode.Feature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
 
@@ -220,6 +221,15 @@ public abstract class INodeWithAdditiona
   }
 
   @Override
+  final AclFeature getAclFeature(int snapshotId) {
+    if (snapshotId != Snapshot.CURRENT_STATE_ID) {
+      return getSnapshotINode(snapshotId).getAclFeature();
+    }
+
+    return getFeature(AclFeature.class);
+  }
+
+  @Override
   final long getModificationTime(int snapshotId) {
     if (snapshotId != Snapshot.CURRENT_STATE_ID) {
       return getSnapshotINode(snapshotId).getModificationTime();
@@ -305,4 +315,33 @@ public abstract class INodeWithAdditiona
         + f.getClass().getSimpleName() + " not found.");
     features = arr;
   }
+
+  protected <T extends Feature> T getFeature(Class<? extends Feature> clazz) {
+    for (Feature f : features) {
+      if (f.getClass() == clazz) {
+        @SuppressWarnings("unchecked")
+        T ret = (T) f;
+        return ret;
+      }
+    }
+    return null;
+  }
+
+  public void removeAclFeature() {
+    AclFeature f = getAclFeature();
+    Preconditions.checkNotNull(f);
+    removeFeature(f);
+  }
+
+  public void addAclFeature(AclFeature f) {
+    AclFeature f1 = getAclFeature();
+    if (f1 != null)
+      throw new IllegalStateException("Duplicated ACLFeature");
+
+    addFeature(f);
+  }
+
+  public final Feature[] getFeatures() {
+    return features;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Feb 19 18:34:52 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.fs.ParentNotDir
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.ha.HAServiceStatus;
@@ -1290,5 +1292,37 @@ class NameNodeRpcServer implements Namen
       throws IOException {
     return namesystem.listCachePools(prevKey != null ? prevKey : "");
   }
+
+  @Override
+  public void modifyAclEntries(String src, List<AclEntry> aclSpec)
+      throws IOException {
+    namesystem.modifyAclEntries(src, aclSpec);
+  }
+
+  @Override
+  public void removeAclEntries(String src, List<AclEntry> aclSpec)
+      throws IOException {
+    namesystem.removeAclEntries(src, aclSpec);
+  }
+
+  @Override
+  public void removeDefaultAcl(String src) throws IOException {
+    namesystem.removeDefaultAcl(src);
+  }
+
+  @Override
+  public void removeAcl(String src) throws IOException {
+    namesystem.removeAcl(src);
+  }
+
+  @Override
+  public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
+    namesystem.setAcl(src, aclSpec);
+  }
+
+  @Override
+  public AclStatus getAclStatus(String src) throws IOException {
+    return namesystem.getAclStatus(src);
+  }
 }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java Wed Feb 19 18:34:52 2014
@@ -36,8 +36,11 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.server.namenode.AclFeature;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
@@ -154,7 +157,7 @@ public class FSImageFormatPBSnapshot {
         SnapshotSection.Snapshot pbs = SnapshotSection.Snapshot
             .parseDelimitedFrom(in);
         INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
-            parent.getLoaderContext().getStringTable());
+            parent.getLoaderContext());
         int sid = pbs.getSnapshotId();
         INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
             .getInode(root.getId()).asDirectory();
@@ -197,6 +200,7 @@ public class FSImageFormatPBSnapshot {
     private void loadFileDiffList(InputStream in, INodeFile file, int size)
         throws IOException {
       final FileDiffList diffs = new FileDiffList();
+      final LoaderContext state = parent.getLoaderContext();
       for (int i = 0; i < size; i++) {
         SnapshotDiffSection.FileDiff pbf = SnapshotDiffSection.FileDiff
             .parseDelimitedFrom(in);
@@ -204,10 +208,16 @@ public class FSImageFormatPBSnapshot {
         if (pbf.hasSnapshotCopy()) {
           INodeSection.INodeFile fileInPb = pbf.getSnapshotCopy();
           PermissionStatus permission = loadPermission(
-              fileInPb.getPermission(), parent.getLoaderContext()
-                  .getStringTable());
+              fileInPb.getPermission(), state.getStringTable());
+
+          AclFeature acl = null;
+          if (fileInPb.hasAcl()) {
+            acl = new AclFeature(FSImageFormatPBINode.Loader.loadAclEntries(
+                fileInPb.getAcl(), state.getStringTable()));
+          }
+
           copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
-              .toByteArray(), permission, fileInPb.getModificationTime(),
+              .toByteArray(), permission, acl, fileInPb.getModificationTime(),
               fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
               fileInPb.getPreferredBlockSize());
         }
@@ -277,6 +287,8 @@ public class FSImageFormatPBSnapshot {
         dir.addSnapshotFeature(null);
       }
       DirectoryDiffList diffs = dir.getDiffs();
+      final LoaderContext state = parent.getLoaderContext();
+
       for (int i = 0; i < size; i++) {
         // load a directory diff
         SnapshotDiffSection.DirectoryDiff diffInPb = SnapshotDiffSection.
@@ -292,15 +304,22 @@ public class FSImageFormatPBSnapshot {
           INodeSection.INodeDirectory dirCopyInPb = diffInPb.getSnapshotCopy();
           final byte[] name = diffInPb.getName().toByteArray();
           PermissionStatus permission = loadPermission(
-              dirCopyInPb.getPermission(), parent.getLoaderContext()
-                  .getStringTable());
+              dirCopyInPb.getPermission(), state.getStringTable());
+          AclFeature acl = null;
+          if (dirCopyInPb.hasAcl()) {
+            acl = new AclFeature(FSImageFormatPBINode.Loader.loadAclEntries(
+                dirCopyInPb.getAcl(), state.getStringTable()));
+          }
+
           long modTime = dirCopyInPb.getModificationTime();
           boolean noQuota = dirCopyInPb.getNsQuota() == -1
               && dirCopyInPb.getDsQuota() == -1;
+
           copy = noQuota ? new INodeDirectoryAttributes.SnapshotCopy(name,
-              permission, modTime)
+              permission, acl, modTime)
               : new INodeDirectoryAttributes.CopyWithQuota(name, permission,
-                  modTime, dirCopyInPb.getNsQuota(), dirCopyInPb.getDsQuota());
+                  acl, modTime, dirCopyInPb.getNsQuota(),
+                  dirCopyInPb.getDsQuota());
         }
         // load created list
         List<INode> clist = loadCreatedList(in, dir,
@@ -355,7 +374,7 @@ public class FSImageFormatPBSnapshot {
           SnapshotSection.Snapshot.Builder sb = SnapshotSection.Snapshot
               .newBuilder().setSnapshotId(s.getId());
           INodeSection.INodeDirectory.Builder db = buildINodeDirectory(sroot,
-              parent.getSaverContext().getStringMap());
+              parent.getSaverContext());
           INodeSection.INode r = INodeSection.INode.newBuilder()
               .setId(sroot.getId())
               .setType(INodeSection.INode.Type.DIRECTORY)
@@ -443,7 +462,7 @@ public class FSImageFormatPBSnapshot {
           INodeFileAttributes copy = diff.snapshotINode;
           if (copy != null) {
             fb.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
-                .setSnapshotCopy(buildINodeFile(copy, parent.getSaverContext().getStringMap()));
+                .setSnapshotCopy(buildINodeFile(copy, parent.getSaverContext()));
           }
           fb.build().writeDelimitedTo(out);
         }
@@ -480,7 +499,7 @@ public class FSImageFormatPBSnapshot {
           if (!diff.isSnapshotRoot() && copy != null) {
             db.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
                 .setSnapshotCopy(
-                    buildINodeDirectory(copy, parent.getSaverContext().getStringMap()));
+                    buildINodeDirectory(copy, parent.getSaverContext()));
           }
           // process created list and deleted list
           List<INode> created = diff.getChildrenDiff()

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Wed Feb 19 18:34:52 2014
@@ -184,7 +184,7 @@ public class INodeDirectorySnapshottable
   private int snapshotQuota = SNAPSHOT_LIMIT;
 
   public INodeDirectorySnapshottable(INodeDirectory dir) {
-    super(dir, true, true);
+    super(dir, true, dir.getFeatures());
     // add snapshot feature if the original directory does not have it
     if (!isWithSnapshot()) {
       addSnapshotFeature(null);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java Wed Feb 19 18:34:52 2014
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Date;
 
@@ -28,12 +29,16 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.AclFeature;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 /** Snapshot of a sub-tree in the namesystem. */
 @InterfaceAudience.Private
 public class Snapshot implements Comparable<byte[]> {
@@ -139,7 +144,10 @@ public class Snapshot implements Compara
   /** The root directory of the snapshot. */
   static public class Root extends INodeDirectory {
     Root(INodeDirectory other) {
-      super(other, false, false);
+      // Always preserve ACL.
+      super(other, false, Lists.newArrayList(
+        Iterables.filter(Arrays.asList(other.getFeatures()), AclFeature.class))
+        .toArray(new Feature[0]));
     }
 
     @Override

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Wed Feb 19 18:34:52 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hdfs.web.ParamF
 import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
+import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
 import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
@@ -315,12 +317,14 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(CreateParentParam.NAME) @DefaultValue(CreateParentParam.DEFAULT)
           final CreateParentParam createParent,
       @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
-          final TokenArgumentParam delegationTokenArgument
-      ) throws IOException, InterruptedException {
+          final TokenArgumentParam delegationTokenArgument,
+      @QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT) 
+          final AclPermissionParam aclPermission
+          )throws IOException, InterruptedException {
     return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
         owner, group, permission, overwrite, bufferSize, replication,
         blockSize, modificationTime, accessTime, renameOptions, createParent,
-        delegationTokenArgument);
+        delegationTokenArgument,aclPermission);
   }
 
   /** Handle HTTP PUT request. */
@@ -364,12 +368,14 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(CreateParentParam.NAME) @DefaultValue(CreateParentParam.DEFAULT)
           final CreateParentParam createParent,
       @QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
-          final TokenArgumentParam delegationTokenArgument
+          final TokenArgumentParam delegationTokenArgument,
+      @QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT) 
+          final AclPermissionParam aclPermission
       ) throws IOException, InterruptedException {
 
     init(ugi, delegation, username, doAsUser, path, op, destination, owner,
         group, permission, overwrite, bufferSize, replication, blockSize,
-        modificationTime, accessTime, renameOptions, delegationTokenArgument);
+        modificationTime, accessTime, renameOptions, delegationTokenArgument,aclPermission);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -380,7 +386,7 @@ public class NamenodeWebHdfsMethods {
               path.getAbsolutePath(), op, destination, owner, group,
               permission, overwrite, bufferSize, replication, blockSize,
               modificationTime, accessTime, renameOptions, createParent,
-              delegationTokenArgument);
+              delegationTokenArgument,aclPermission);
         } finally {
           REMOTE_ADDRESS.set(null);
         }
@@ -407,7 +413,8 @@ public class NamenodeWebHdfsMethods {
       final AccessTimeParam accessTime,
       final RenameOptionSetParam renameOptions,
       final CreateParentParam createParent,
-      final TokenArgumentParam delegationTokenArgument
+      final TokenArgumentParam delegationTokenArgument,
+      final AclPermissionParam aclPermission
       ) throws IOException, URISyntaxException {
 
     final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@@ -487,6 +494,26 @@ public class NamenodeWebHdfsMethods {
       np.cancelDelegationToken(token);
       return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
+    case MODIFYACLENTRIES: {
+      np.modifyAclEntries(fullpath, aclPermission.getAclPermission(true));
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
+    case REMOVEACLENTRIES: {
+      np.removeAclEntries(fullpath, aclPermission.getAclPermission(false));
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
+    case REMOVEDEFAULTACL: {
+      np.removeDefaultAcl(fullpath);
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
+    case REMOVEACL: {
+      np.removeAcl(fullpath);
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
+    case SETACL: {
+      np.setAcl(fullpath, aclPermission.getAclPermission(true));
+      return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+    }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
     }
@@ -727,6 +754,15 @@ public class NamenodeWebHdfsMethods {
           WebHdfsFileSystem.getHomeDirectoryString(ugi));
       return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
     }
+    case GETACLSTATUS: {
+      AclStatus status = np.getAclStatus(fullpath);
+      if (status == null) {
+        throw new FileNotFoundException("File does not exist: " + fullpath);
+      }
+
+      final String js = JsonUtil.toJsonString(status);
+      return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+    }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java Wed Feb 19 18:34:52 2014
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.web;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
@@ -613,4 +615,44 @@ public class JsonUtil {
 
     return checksum;
   }
+  /** Convert a AclStatus object to a Json string. */
+  public static String toJsonString(final AclStatus status) {
+    if (status == null) {
+      return null;
+    }
+
+    final Map<String, Object> m = new TreeMap<String, Object>();
+    m.put("owner", status.getOwner());
+    m.put("group", status.getGroup());
+    m.put("stickyBit", status.isStickyBit());
+    m.put("entries", status.getEntries());
+    final Map<String, Map<String, Object>> finalMap =
+        new TreeMap<String, Map<String, Object>>();
+    finalMap.put(AclStatus.class.getSimpleName(), m);
+    return JSON.toString(finalMap);
+  }
+
+  /** Convert a Json map to a AclStatus object. */
+  public static AclStatus toAclStatus(final Map<?, ?> json) {
+    if (json == null) {
+      return null;
+    }
+
+    final Map<?, ?> m = (Map<?, ?>) json.get(AclStatus.class.getSimpleName());
+
+    AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
+    aclStatusBuilder.owner((String) m.get("owner"));
+    aclStatusBuilder.group((String) m.get("group"));
+    aclStatusBuilder.stickyBit((Boolean) m.get("stickyBit"));
+
+    final Object[] entries = (Object[]) m.get("entries");
+
+    List<AclEntry> aclEntryList = new ArrayList<AclEntry>();
+    for (int i = 0; i < entries.length; i++) {
+      AclEntry aclEntry = AclEntry.parseAclEntry((String) entries[i], true);
+      aclEntryList.add(aclEntry);
+    }
+    aclStatusBuilder.addEntries(aclEntryList);
+    return aclStatusBuilder.build();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Feb 19 18:34:52 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
+import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
 import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
@@ -698,6 +701,17 @@ public class WebHdfsFileSystem extends F
   }
 
   @Override
+  public AclStatus getAclStatus(Path f) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
+    final Map<?, ?> json = run(op, f);
+    AclStatus status = JsonUtil.toAclStatus(json);
+    if (status == null) {
+      throw new FileNotFoundException("File does not exist: " + f);
+    }
+    return status;
+  }
+
+  @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
@@ -758,6 +772,44 @@ public class WebHdfsFileSystem extends F
   }
 
   @Override
+  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
+    run(op, path, new AclPermissionParam(aclSpec));
+  }
+
+  @Override
+  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
+    run(op, path, new AclPermissionParam(aclSpec));
+  }
+
+  @Override
+  public void removeDefaultAcl(Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
+    run(op, path);
+  }
+
+  @Override
+  public void removeAcl(Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
+    run(op, path);
+  }
+
+  @Override
+  public void setAcl(final Path p, final List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.SETACL;
+    run(op, p, new AclPermissionParam(aclSpec));
+  }
+
+  @Override
   public boolean setReplication(final Path p, final short replication
      ) throws IOException {
     statistics.incrementWriteOps(1);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java Wed Feb 19 18:34:52 2014
@@ -35,6 +35,7 @@ public class GetOpParam extends HttpOpPa
 
     /** GET_BLOCK_LOCATIONS is a private unstable op. */
     GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
+    GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
 
     NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java Wed Feb 19 18:34:52 2014
@@ -37,6 +37,12 @@ public class PutOpParam extends HttpOpPa
     RENEWDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
     CANCELDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
     
+    MODIFYACLENTRIES(false, HttpURLConnection.HTTP_OK),
+    REMOVEACLENTRIES(false, HttpURLConnection.HTTP_OK),
+    REMOVEDEFAULTACL(false, HttpURLConnection.HTTP_OK),
+    REMOVEACL(false, HttpURLConnection.HTTP_OK),
+    SETACL(false, HttpURLConnection.HTTP_OK),
+    
     NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
 
     final boolean doOutputAndRedirect;

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1547224-1569863

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Wed Feb 19 18:34:52 2014
@@ -30,6 +30,7 @@ package hadoop.hdfs;
 
 import "Security.proto";
 import "hdfs.proto";
+import "acl.proto";
 
 /**
  * The ClientNamenodeProtocol Service defines the interface between a client 
@@ -719,4 +720,16 @@ service ClientNamenodeProtocol {
       returns(GetSnapshotDiffReportResponseProto);
   rpc isFileClosed(IsFileClosedRequestProto)
       returns(IsFileClosedResponseProto);
+  rpc modifyAclEntries(ModifyAclEntriesRequestProto)
+      returns(ModifyAclEntriesResponseProto);
+  rpc removeAclEntries(RemoveAclEntriesRequestProto)
+      returns(RemoveAclEntriesResponseProto);
+  rpc removeDefaultAcl(RemoveDefaultAclRequestProto)
+      returns(RemoveDefaultAclResponseProto);
+  rpc removeAcl(RemoveAclRequestProto)
+      returns(RemoveAclResponseProto);
+  rpc setAcl(SetAclRequestProto)
+      returns(SetAclResponseProto);
+  rpc getAclStatus(GetAclStatusRequestProto)
+      returns(GetAclStatusResponseProto);
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto Wed Feb 19 18:34:52 2014
@@ -22,6 +22,7 @@ option java_outer_classname = "FsImagePr
 package hadoop.hdfs.fsimage;
 
 import "hdfs.proto";
+import "acl.proto";
 
 /**
  * This file defines the on-disk layout of the file system image. The
@@ -88,6 +89,23 @@ message INodeSection {
     optional string clientMachine = 2;
   }
 
+  message AclFeatureProto {
+    /**
+     * An ACL entry is represented by a 32-bit integer in Big Endian
+     * format. The bits can be divided in four segments:
+     * [0:2) || [2:26) || [26:27) || [27:29) || [29:32)
+     *
+     * [0:2) -- reserved for futute uses.
+     * [2:26) -- the name of the entry, which is an ID that points to a
+     * string in the StringTableSection.
+     * [26:27) -- the scope of the entry (AclEntryScopeProto)
+     * [27:29) -- the type of the entry (AclEntryTypeProto)
+     * [29:32) -- the permission of the entry (FsActionProto)
+     *
+     */
+    repeated fixed32 entries = 2 [packed = true];
+  }
+
   message INodeFile {
     optional uint32 replication = 1;
     optional uint64 modificationTime = 2;
@@ -96,6 +114,7 @@ message INodeSection {
     optional fixed64 permission = 5;
     repeated BlockProto blocks = 6;
     optional FileUnderConstructionFeature fileUC = 7;
+    optional AclFeatureProto acl = 8;
   }
 
   message INodeDirectory {
@@ -105,6 +124,7 @@ message INodeSection {
     // diskspace quota
     optional uint64 dsQuota = 3;
     optional fixed64 permission = 4;
+    optional AclFeatureProto acl = 5;
   }
 
   message INodeSymlink {
@@ -280,5 +300,4 @@ message CacheManagerSection {
   required uint32 numDirectives   = 3;
   // repeated CachePoolInfoProto pools
   // repeated CacheDirectiveInfoProto directives
-}
-
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Feb 19 18:34:52 2014
@@ -364,6 +364,16 @@
 -->
 
 <property>
+  <name>dfs.namenode.acls.enabled</name>
+  <value>false</value>
+  <description>
+    Set to true to enable support for HDFS ACLs (Access Control Lists).  By
+    default, ACLs are disabled.  When ACLs are disabled, the NameNode rejects
+    all RPCs related to setting or getting ACLs.
+  </description>
+</property>
+
+<property>
   <name>dfs.block.access.token.enable</name>
   <value>false</value>
   <description>

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1547224-1569863

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1547224-1569863

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1547224-1569863

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsPermissionsGuide.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsPermissionsGuide.apt.vm?rev=1569870&r1=1569869&r2=1569870&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsPermissionsGuide.apt.vm (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsPermissionsGuide.apt.vm Wed Feb 19 18:34:52 2014
@@ -47,6 +47,10 @@ HDFS Permissions Guide
    client process, and its group is the group of the parent directory (the
    BSD rule).
 
+   HDFS also provides optional support for POSIX ACLs (Access Control Lists) to
+   augment file permissions with finer-grained rules for specific named users or
+   named groups.  ACLs are discussed in greater detail later in this document.
+
    Each client process that accesses HDFS has a two-part identity composed
    of the user name, and groups list. Whenever HDFS must do a permissions
    check for a file or directory foo accessed by a client process,
@@ -219,9 +223,173 @@ HDFS Permissions Guide
    identity matches the super-user, parts of the name space may be
    inaccessible to the web server.
 
+* ACLs (Access Control Lists)
+
+   In addition to the traditional POSIX permissions model, HDFS also supports
+   POSIX ACLs (Access Control Lists).  ACLs are useful for implementing
+   permission requirements that differ from the natural organizational hierarchy
+   of users and groups.  An ACL provides a way to set different permissions for
+   specific named users or named groups, not only the file's owner and the
+   file's group.
+
+   By default, support for ACLs is disabled, and the NameNode disallows creation
+   of ACLs.  To enable support for ACLs, set <<<dfs.namenode.acls.enabled>>> to
+   true in the NameNode configuration.
+
+   An ACL consists of a set of ACL entries.  Each ACL entry names a specific
+   user or group and grants or denies read, write and execute permissions for
+   that specific user or group.  For example:
+
++--
+   user::rw-
+   user:bruce:rwx                  #effective:r--
+   group::r-x                      #effective:r--
+   group:sales:rwx                 #effective:r--
+   mask::r--
+   other::r--
++--
+
+   ACL entries consist of a type, an optional name and a permission string.
+   For display purposes, ':' is used as the delimiter between each field.  In
+   this example ACL, the file owner has read-write access, the file group has
+   read-execute access and others have read access.  So far, this is equivalent
+   to setting the file's permission bits to 654.
+
+   Additionally, there are 2 extended ACL entries for the named user bruce and
+   the named group sales, both granted full access.  The mask is a special ACL
+   entry that filters the permissions granted to all named user entries and
+   named group entries, and also the unnamed group entry.  In the example, the
+   mask has only read permissions, and we can see that the effective permissions
+   of several ACL entries have been filtered accordingly.
+
+   Every ACL must have a mask.  If the user doesn't supply a mask while setting
+   an ACL, then a mask is inserted automatically by calculating the union of
+   permissions on all entries that would be filtered by the mask.
+
+   Running <<<chmod>>> on a file that has an ACL actually changes the
+   permissions of the mask.  Since the mask acts as a filter, this effectively
+   constrains the permissions of all extended ACL entries instead of changing
+   just the group entry and possibly missing other extended ACL entries.
+
+   The model also differentiates between an "access ACL", which defines the
+   rules to enforce during permission checks, and a "default ACL", which defines
+   the ACL entries that new child files or sub-directories receive automatically
+   during creation.  For example:
+
++--
+   user::rwx
+   group::r-x
+   other::r-x
+   default:user::rwx
+   default:user:bruce:rwx          #effective:r-x
+   default:group::r-x
+   default:group:sales:rwx         #effective:r-x
+   default:mask::r-x
+   default:other::r-x
++--
+
+   Only directories may have a default ACL.  When a new file or sub-directory is
+   created, it automatically copies the default ACL of its parent into its own
+   access ACL.  A new sub-directory also copies it to its own default ACL.  In
+   this way, the default ACL will be copied down through arbitrarily deep levels
+   of the file system tree as new sub-directories get created.
+
+   The exact permission values in the new child's access ACL are subject to
+   filtering by the mode parameter.  Considering the default umask of 022, this
+   is typically 755 for new directories and 644 for new files.  The mode
+   parameter filters the copied permission values for the unnamed user (file
+   owner), the mask and other.  Using this particular example ACL, and creating
+   a new sub-directory with 755 for the mode, this mode filtering has no effect
+   on the final result.  However, if we consider creation of a file with 644 for
+   the mode, then mode filtering causes the new file's ACL to receive read-write
+   for the unnamed user (file owner), read for the mask and read for others.
+   This mask also means that effective permissions for named user bruce and
+   named group sales are only read.
+
+   Note that the copy occurs at time of creation of the new file or
+   sub-directory.  Subsequent changes to the parent's default ACL do not change
+   existing children.
+
+   The default ACL must have all minimum required ACL entries, including the
+   unnamed user (file owner), unnamed group (file group) and other entries.  If
+   the user doesn't supply one of these entries while setting a default ACL,
+   then the entries are inserted automatically by copying the corresponding
+   permissions from the access ACL, or permission bits if there is no access
+   ACL.  The default ACL also must have mask.  As described above, if the mask
+   is unspecified, then a mask is inserted automatically by calculating the
+   union of permissions on all entries that would be filtered by the mask.
+
+   When considering a file that has an ACL, the algorithm for permission checks
+   changes to:
+
+     * If the user name matches the owner of file, then the owner
+       permissions are tested;
+
+     * Else if the user name matches the name in one of the named user entries,
+       then these permissions are tested, filtered by the mask permissions;
+
+     * Else if the group of file matches any member of the groups list,
+       and if these permissions filtered by the mask grant access, then these
+       permissions are used;
+
+     * Else if there is a named group entry matching a member of the groups list,
+       and if these permissions filtered by the mask grant access, then these
+       permissions are used;
+
+     * Else if the file group or any named group entry matches a member of the
+       groups list, but access was not granted by any of those permissions, then
+       access is denied;
+
+     * Otherwise the other permissions of file are tested.
+
+   Best practice is to rely on traditional permission bits to implement most
+   permission requirements, and define a smaller number of ACLs to augment the
+   permission bits with a few exceptional rules.  A file with an ACL incurs an
+   additional cost in memory in the NameNode compared to a file that has only
+   permission bits.
+
+* ACLs File System API
+
+   New methods:
+
+     * <<<public void modifyAclEntries(Path path, List<AclEntry> aclSpec) throws
+       IOException;>>>
+
+     * <<<public void removeAclEntries(Path path, List<AclEntry> aclSpec) throws
+       IOException;>>>
+
+     * <<<public void public void removeDefaultAcl(Path path) throws
+       IOException;>>>
+
+     * <<<public void removeAcl(Path path) throws IOException;>>>
+
+     * <<<public void setAcl(Path path, List<AclEntry> aclSpec) throws
+       IOException;>>>
+
+     * <<<public AclStatus getAclStatus(Path path) throws IOException;>>>
+
+* ACLs Shell Commands
+
+     * <<<hdfs dfs -getfacl [-R] <path> >>>
+
+       Displays the Access Control Lists (ACLs) of files and directories. If a
+       directory has a default ACL, then getfacl also displays the default ACL.
+
+     * <<<hdfs dfs -setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>] >>>
+
+       Sets Access Control Lists (ACLs) of files and directories.
+
+     * <<<hdfs dfs -ls <args> >>>
+
+       The output of <<<ls>>> will append a '+' character to the permissions
+       string of any file or directory that has an ACL.
+
+       See the {{{../hadoop-common/FileSystemShell.html}File System Shell}}
+       documentation for full coverage of these commands.
+
 * Configuration Parameters
 
-     * <<<dfs.permissions = true>>>
+     * <<<dfs.permissions.enabled = true>>>
 
        If yes use the permissions system as described here. If no,
        permission checking is turned off, but all other behavior is
@@ -255,3 +423,9 @@ HDFS Permissions Guide
 
        The administrators for the cluster specified as an ACL. This
        controls who can access the default servlets, etc. in the HDFS.
+
+     * <<<dfs.namenode.acls.enabled = true>>>
+
+       Set to true to enable support for HDFS ACLs (Access Control Lists).  By
+       default, ACLs are disabled.  When ACLs are disabled, the NameNode rejects
+       all attempts to set an ACL.



Mime
View raw message