hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1563304 - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/ s...
Date Fri, 31 Jan 2014 22:21:20 GMT
Author: cnauroth
Date: Fri Jan 31 22:21:19 2014
New Revision: 1563304

URL: http://svn.apache.org/r1563304
Log:
HDFS-5614. NameNode: implement handling of ACLs in combination with snapshots. Contributed by Chris Nauroth.

Added:
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
Modified:
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDiff.java

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt Fri Jan 31 22:21:19 2014
@@ -50,6 +50,9 @@ HDFS-4685 (Unreleased)
     HDFS-5608. WebHDFS: implement ACL APIs.
     (Sachin Jose and Renil Joseph via cnauroth)
 
+    HDFS-5614. NameNode: implement handling of ACLs in combination with
+    snapshots. (cnauroth)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jan 31 22:21:19 2014
@@ -2651,6 +2651,7 @@ public class DFSClient implements java.i
       throw re.unwrapRemoteException(AccessControlException.class,
                                      AclException.class,
                                      FileNotFoundException.class,
+                                     NSQuotaExceededException.class,
                                      SafeModeException.class,
                                      SnapshotAccessControlException.class,
                                      UnresolvedPathException.class);
@@ -2666,6 +2667,7 @@ public class DFSClient implements java.i
       throw re.unwrapRemoteException(AccessControlException.class,
                                      AclException.class,
                                      FileNotFoundException.class,
+                                     NSQuotaExceededException.class,
                                      SafeModeException.class,
                                      SnapshotAccessControlException.class,
                                      UnresolvedPathException.class);
@@ -2680,6 +2682,7 @@ public class DFSClient implements java.i
       throw re.unwrapRemoteException(AccessControlException.class,
                                      AclException.class,
                                      FileNotFoundException.class,
+                                     NSQuotaExceededException.class,
                                      SafeModeException.class,
                                      SnapshotAccessControlException.class,
                                      UnresolvedPathException.class);
@@ -2694,6 +2697,7 @@ public class DFSClient implements java.i
       throw re.unwrapRemoteException(AccessControlException.class,
                                      AclException.class,
                                      FileNotFoundException.class,
+                                     NSQuotaExceededException.class,
                                      SafeModeException.class,
                                      SnapshotAccessControlException.class,
                                      UnresolvedPathException.class);
@@ -2708,6 +2712,7 @@ public class DFSClient implements java.i
       throw re.unwrapRemoteException(AccessControlException.class,
                                      AclException.class,
                                      FileNotFoundException.class,
+                                     NSQuotaExceededException.class,
                                      SafeModeException.class,
                                      SnapshotAccessControlException.class,
                                      UnresolvedPathException.class);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java Fri Jan 31 22:21:19 2014
@@ -31,13 +31,13 @@ import org.apache.hadoop.fs.permission.A
 public class AclFeature implements INode.Feature {
   public static final List<AclEntry> EMPTY_ENTRY_LIST = Collections.emptyList();
 
-  private List<AclEntry> entries;
+  private final List<AclEntry> entries;
 
-  public List<AclEntry> getEntries() {
-    return entries;
+  public AclFeature(List<AclEntry> entries) {
+    this.entries = entries;
   }
 
-  public void setEntries(List<AclEntry> entries) {
-    this.entries = entries;
+  public List<AclEntry> getEntries() {
+    return entries;
   }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java Fri Jan 31 22:21:19 2014
@@ -62,17 +62,17 @@ final class AclStorage {
   /**
    * Reads the existing extended ACL entries of an inode.  This method returns
    * only the extended ACL entries stored in the AclFeature.  If the inode does
-   * not have an ACL, then this method returns an empty list.
+   * not have an ACL, then this method returns an empty list.  This method
+   * supports querying by snapshot ID.
    *
-   * @param inode INodeWithAdditionalFields to read
-   * @param snapshotId int latest snapshot ID of inode
+   * @param inode INode to read
+   * @param snapshotId int ID of snapshot to read
    * @return List<AclEntry> containing extended inode ACL entries
    */
-  public static List<AclEntry> readINodeAcl(INodeWithAdditionalFields inode,
-      int snapshotId) {
-    FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
+  public static List<AclEntry> readINodeAcl(INode inode, int snapshotId) {
+    FsPermission perm = inode.getFsPermission(snapshotId);
     if (perm.getAclBit()) {
-      return inode.getAclFeature().getEntries();
+      return inode.getAclFeature(snapshotId).getEntries();
     } else {
       return Collections.emptyList();
     }
@@ -85,15 +85,16 @@ final class AclStorage {
    * logically has an ACL, even if no ACL has been set explicitly.  If the inode
    * does not have an extended ACL, then the result is a minimal ACL consising of
    * exactly 3 entries that correspond to the owner, group and other permissions.
+   * This method always reads the inode's current state and does not support
+   * querying by snapshot ID.  This is because the method is intended to support
+   * ACL modification APIs, which always apply a delta on top of current state.
    *
-   * @param inode INodeWithAdditionalFields to read
-   * @param snapshotId int latest snapshot ID of inode
+   * @param inode INode to read
    * @return List<AclEntry> containing all logical inode ACL entries
    */
-  public static List<AclEntry> readINodeLogicalAcl(
-      INodeWithAdditionalFields inode, int snapshotId) {
+  public static List<AclEntry> readINodeLogicalAcl(INode inode) {
     final List<AclEntry> existingAcl;
-    FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
+    FsPermission perm = inode.getFsPermission();
     if (perm.getAclBit()) {
       // Split ACL entries stored in the feature into access vs. default.
       List<AclEntry> featureEntries = inode.getAclFeature().getEntries();
@@ -150,13 +151,13 @@ final class AclStorage {
   /**
    * Completely removes the ACL from an inode.
    *
-   * @param inode INodeWithAdditionalFields to update
+   * @param inode INode to update
    * @param snapshotId int latest snapshot ID of inode
    * @throws QuotaExceededException if quota limit is exceeded
    */
-  public static void removeINodeAcl(INodeWithAdditionalFields inode,
-      int snapshotId) throws QuotaExceededException {
-    FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
+  public static void removeINodeAcl(INode inode, int snapshotId)
+      throws QuotaExceededException {
+    FsPermission perm = inode.getFsPermission();
     if (perm.getAclBit()) {
       List<AclEntry> featureEntries = inode.getAclFeature().getEntries();
       final FsAction groupPerm;
@@ -176,7 +177,7 @@ final class AclStorage {
       }
 
       // Remove the feature and turn off the ACL bit.
-      inode.removeAclFeature();
+      inode.removeAclFeature(snapshotId);
       FsPermission newPerm = new FsPermission(perm.getUserAction(),
         groupPerm, perm.getOtherAction(),
         perm.getStickyBit(), false);
@@ -189,17 +190,16 @@ final class AclStorage {
    * stores the entries to the inode's {@link FsPermission} and
    * {@link AclFeature}.
    *
-   * @param inode INodeWithAdditionalFields to update
+   * @param inode INode to update
    * @param newAcl List<AclEntry> containing new ACL entries
    * @param snapshotId int latest snapshot ID of inode
    * @throws AclException if the ACL is invalid for the given inode
    * @throws QuotaExceededException if quota limit is exceeded
    */
-  public static void updateINodeAcl(INodeWithAdditionalFields inode,
-      List<AclEntry> newAcl, int snapshotId) throws AclException,
-      QuotaExceededException {
+  public static void updateINodeAcl(INode inode, List<AclEntry> newAcl,
+      int snapshotId) throws AclException, QuotaExceededException {
     assert newAcl.size() >= 3;
-    FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
+    FsPermission perm = inode.getFsPermission();
     final FsPermission newPerm;
     if (newAcl.size() > 3) {
       // This is an extended ACL.  Split entries into access vs. default.
@@ -238,17 +238,15 @@ final class AclStorage {
       // Add all default entries to the feature.
       featureEntries.addAll(defaultEntries);
 
-      // Attach entries to the feature, creating a new feature if needed.
-      AclFeature aclFeature = inode.getAclFeature();
-      if (aclFeature == null) {
-        aclFeature = new AclFeature();
-        inode.addAclFeature(aclFeature);
+      // Attach entries to the feature.
+      if (perm.getAclBit()) {
+        inode.removeAclFeature(snapshotId);
       }
-      aclFeature.setEntries(featureEntries);
+      inode.addAclFeature(new AclFeature(featureEntries), snapshotId);
     } else {
       // This is a minimal ACL.  Remove the ACL feature if it previously had one.
       if (perm.getAclBit()) {
-        inode.removeAclFeature();
+        inode.removeAclFeature(snapshotId);
       }
 
       // Calculate new permission bits.  For a correctly sorted ACL, the owner,

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Jan 31 22:21:19 2014
@@ -1629,6 +1629,14 @@ public class FSDirectory implements Clos
    */
   private HdfsFileStatus getFileInfo4DotSnapshot(String src)
       throws UnresolvedLinkException {
+    if (getINode4DotSnapshot(src) != null) {
+      return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
+          HdfsFileStatus.EMPTY_NAME, -1L, 0);
+    }
+    return null;
+  }
+
+  private INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
     Preconditions.checkArgument(
         src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR), 
         "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
@@ -1640,8 +1648,7 @@ public class FSDirectory implements Clos
     if (node != null
         && node.isDirectory()
         && node.asDirectory() instanceof INodeDirectorySnapshottable) {
-      return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
-          HdfsFileStatus.EMPTY_NAME, -1L, 0);
+      return node;
     }
     return null;
   }
@@ -2697,10 +2704,9 @@ public class FSDirectory implements Clos
       List<AclEntry> aclSpec) throws IOException {
     assert hasWriteLock();
     INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
-    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
-    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
-      snapshotId);
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
     List<AclEntry> newAcl = AclTransformation.mergeAclEntries(existingAcl,
       aclSpec);
     AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
@@ -2721,10 +2727,9 @@ public class FSDirectory implements Clos
       List<AclEntry> aclSpec) throws IOException {
     assert hasWriteLock();
     INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
-    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
-    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
-      snapshotId);
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
     List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
       existingAcl, aclSpec);
     AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
@@ -2745,10 +2750,9 @@ public class FSDirectory implements Clos
       throws IOException {
     assert hasWriteLock();
     INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
-    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
-    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
-      snapshotId);
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
     List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
       existingAcl);
     AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
@@ -2768,7 +2772,7 @@ public class FSDirectory implements Clos
   private void unprotectedRemoveAcl(String src) throws IOException {
     assert hasWriteLock();
     INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
-    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
     AclStorage.removeINodeAcl(inode, snapshotId);
   }
@@ -2793,10 +2797,9 @@ public class FSDirectory implements Clos
 
     assert hasWriteLock();
     INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
-    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
-    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
-      snapshotId);
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
     List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
       aclSpec);
     AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
@@ -2804,12 +2807,18 @@ public class FSDirectory implements Clos
   }
 
   AclStatus getAclStatus(String src) throws IOException {
+    String srcs = normalizePath(src);
     readLock();
     try {
-      INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
-      final INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(
-        src, iip);
-      int snapshotId = iip.getLatestSnapshotId();
+      // There is no real inode for the path ending in ".snapshot", so return a
+      // non-null, unpopulated AclStatus.  This is similar to getFileInfo.
+      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
+          getINode4DotSnapshot(srcs) != null) {
+        return new AclStatus.Builder().owner("").group("").build();
+      }
+      INodesInPath iip = rootDir.getLastINodeInPath(srcs, true);
+      INode inode = resolveLastINode(src, iip);
+      int snapshotId = iip.getPathSnapshotId();
       List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
       return new AclStatus.Builder()
           .owner(inode.getUserName()).group(inode.getGroupName())
@@ -2820,12 +2829,12 @@ public class FSDirectory implements Clos
     }
   }
 
-  private static INodeWithAdditionalFields resolveINodeWithAdditionalFields(
-      String src, INodesInPath iip) throws FileNotFoundException {
+  private static INode resolveLastINode(String src, INodesInPath iip)
+      throws FileNotFoundException {
     INode inode = iip.getLastINode();
-    if (!(inode instanceof INodeWithAdditionalFields))
+    if (inode == null)
       throw new FileNotFoundException("cannot find " + src);
-    return (INodeWithAdditionalFields)inode;
+    return inode;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Jan 31 22:21:19 2014
@@ -712,9 +712,11 @@ public class FSImageFormat {
         file.toUnderConstruction(clientName, clientMachine, null);
       }
 
-      AclFeature aclFeature = loadAclFeature(in, imgVersion);
-      if (aclFeature != null) {
-        file.addAclFeature(aclFeature);
+      if (permissions.getPermission().getAclBit()) {
+        AclFeature aclFeature = loadAclFeature(in, imgVersion);
+        if (aclFeature != null) {
+          file.addAclFeature(aclFeature);
+        }
       }
 
       return fileDiffs == null ? file : new INodeFile(file, fileDiffs);
@@ -751,9 +753,11 @@ public class FSImageFormat {
         dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
       }
 
-      AclFeature aclFeature = loadAclFeature(in, imgVersion);
-      if (aclFeature != null) {
-        dir.addAclFeature(aclFeature);
+      if (permissions.getPermission().getAclBit()) {
+        AclFeature aclFeature = loadAclFeature(in, imgVersion);
+        if (aclFeature != null) {
+          dir.addAclFeature(aclFeature);
+        }
       }
 
       if (withSnapshot) {
@@ -802,8 +806,8 @@ public class FSImageFormat {
       if (LayoutVersion.supports(Feature.EXTENDED_ACL, imgVersion)) {
         AclFsImageProto p = AclFsImageProto
             .parseDelimitedFrom((DataInputStream) in);
-        aclFeature = new AclFeature();
-        aclFeature.setEntries(PBHelper.convertAclEntry(p.getEntriesList()));
+        aclFeature = new AclFeature(PBHelper.convertAclEntry(
+            p.getEntriesList()));
       }
       return aclFeature;
     }
@@ -825,9 +829,10 @@ public class FSImageFormat {
       final short replication = namesystem.getBlockManager().adjustReplication(
           in.readShort());
       final long preferredBlockSize = in.readLong();
-      
-      return new INodeFileAttributes.SnapshotCopy(name, permissions, modificationTime,
-          accessTime, replication, preferredBlockSize);
+      AclFeature aclFeature = permissions.getPermission().getAclBit() ?
+          loadAclFeature(in, layoutVersion) : null;
+      return new INodeFileAttributes.SnapshotCopy(name, permissions, aclFeature,
+          modificationTime, accessTime, replication, preferredBlockSize);
     }
 
     public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
@@ -845,11 +850,14 @@ public class FSImageFormat {
       //read quotas
       final long nsQuota = in.readLong();
       final long dsQuota = in.readLong();
+      AclFeature aclFeature = permissions.getPermission().getAclBit() ?
+          loadAclFeature(in, layoutVersion) : null;
   
       return nsQuota == -1L && dsQuota == -1L?
-          new INodeDirectoryAttributes.SnapshotCopy(name, permissions, modificationTime)
+          new INodeDirectoryAttributes.SnapshotCopy(name, permissions,
+            aclFeature, modificationTime)
         : new INodeDirectoryAttributes.CopyWithQuota(name, permissions,
-            modificationTime, nsQuota, dsQuota);
+            aclFeature, modificationTime, nsQuota, dsQuota);
     }
   
     private void loadFilesUnderConstruction(DataInput in,

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Fri Jan 31 22:21:19 2014
@@ -220,6 +220,7 @@ public class FSImageSerialization {
 
     out.writeShort(file.getFileReplication());
     out.writeLong(file.getPreferredBlockSize());
+    writeAclFeature(file, out);
   }
 
   private static void writeQuota(Quota.Counts quota, DataOutput out)
@@ -267,6 +268,7 @@ public class FSImageSerialization {
     writePermissionStatus(a, out);
     out.writeLong(a.getModificationTime());
     writeQuota(a.getQuotaCounts(), out);
+    writeAclFeature(a, out);
   }
 
   /**
@@ -288,16 +290,18 @@ public class FSImageSerialization {
     writePermissionStatus(node, out);
   }
 
-  private static void writeAclFeature(INodeWithAdditionalFields node,
-      DataOutput out) throws IOException {
-    AclFsImageProto.Builder b = AclFsImageProto.newBuilder();
-    OutputStream os = (OutputStream) out;
-
-    AclFeature feature = node.getAclFeature();
-    if (feature != null)
-      b.addAllEntries(PBHelper.convertAclEntryProto(feature.getEntries()));
+  private static void writeAclFeature(INodeAttributes node, DataOutput out)
+      throws IOException {
+    if (node.getFsPermission().getAclBit()) {
+      AclFsImageProto.Builder b = AclFsImageProto.newBuilder();
+      OutputStream os = (OutputStream) out;
+
+      AclFeature feature = node.getAclFeature();
+      if (feature != null)
+        b.addAllEntries(PBHelper.convertAclEntryProto(feature.getEntries()));
 
-    b.build().writeDelimitedTo(os);
+      b.build().writeDelimitedTo(os);
+    }
   }
 
   /** Serialize a {@link INodeReference} node */

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Fri Jan 31 22:21:19 2014
@@ -240,16 +240,13 @@ class FSPermissionChecker {
     }
     FsPermission mode = inode.getFsPermission(snapshotId);
     if (mode.getAclBit()) {
-      // TODO: handling of INodeReference?
-      AclFeature aclFeature = inode instanceof INodeWithAdditionalFields ?
-        ((INodeWithAdditionalFields)inode).getAclFeature() : null;
-      if (aclFeature != null) {
-        List<AclEntry> featureEntries = aclFeature.getEntries();
-        // It's possible that the inode has a default ACL but no access ACL.
-        if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
-          checkAccessAcl(inode, snapshotId, access, mode, featureEntries);
-          return;
-        }
+      AclFeature aclFeature = inode.getAclFeature(snapshotId);
+      assert aclFeature != null;
+      List<AclEntry> featureEntries = aclFeature.getEntries();
+      // It's possible that the inode has a default ACL but no access ACL.
+      if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
+        checkAccessAcl(inode, snapshotId, access, mode, featureEntries);
+        return;
       }
     }
     checkFsPermission(inode, snapshotId, access, mode);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Jan 31 22:21:19 2014
@@ -154,6 +154,31 @@ public abstract class INode implements I
     return nodeToUpdate;
   }
 
+  abstract AclFeature getAclFeature(int snapshotId);
+
+  @Override
+  public final AclFeature getAclFeature() {
+    return getAclFeature(Snapshot.CURRENT_STATE_ID);
+  }
+
+  abstract void addAclFeature(AclFeature aclFeature);
+
+  final INode addAclFeature(AclFeature aclFeature, int latestSnapshotId)
+      throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
+    nodeToUpdate.addAclFeature(aclFeature);
+    return nodeToUpdate;
+  }
+
+  abstract void removeAclFeature();
+
+  final INode removeAclFeature(int latestSnapshotId)
+      throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
+    nodeToUpdate.removeAclFeature();
+    return nodeToUpdate;
+  }
+  
   /**
    * @return if the given snapshot id is {@link Snapshot#CURRENT_STATE_ID},
    *         return this; otherwise return the corresponding snapshot inode.

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeAttributes.java Fri Jan 31 22:21:19 2014
@@ -48,6 +48,9 @@ public interface INodeAttributes {
   /** @return the permission information as a long. */
   public long getPermissionLong();
 
+  /** @return the ACL feature. */
+  public AclFeature getAclFeature();
+
   /** @return the modification time. */
   public long getModificationTime();
 
@@ -58,13 +61,15 @@ public interface INodeAttributes {
   public static abstract class SnapshotCopy implements INodeAttributes {
     private final byte[] name;
     private final long permission;
+    private final AclFeature aclFeature;
     private final long modificationTime;
     private final long accessTime;
 
     SnapshotCopy(byte[] name, PermissionStatus permissions,
-        long modificationTime, long accessTime) {
+        AclFeature aclFeature, long modificationTime, long accessTime) {
       this.name = name;
       this.permission = PermissionStatusFormat.toLong(permissions);
+      this.aclFeature = aclFeature;
       this.modificationTime = modificationTime;
       this.accessTime = accessTime;
     }
@@ -72,6 +77,7 @@ public interface INodeAttributes {
     SnapshotCopy(INode inode) {
       this.name = inode.getLocalNameBytes();
       this.permission = inode.getPermissionLong();
+      this.aclFeature = inode.getAclFeature();
       this.modificationTime = inode.getModificationTime();
       this.accessTime = inode.getAccessTime();
     }
@@ -109,6 +115,11 @@ public interface INodeAttributes {
     }
 
     @Override
+    public AclFeature getAclFeature() {
+      return aclFeature;
+    }
+
+    @Override
     public final long getModificationTime() {
       return modificationTime;
     }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Fri Jan 31 22:21:19 2014
@@ -77,8 +77,11 @@ public class INodeDirectory extends INod
    * @param other The INodeDirectory to be copied
    * @param adopt Indicate whether or not need to set the parent field of child
    *              INodes to the new node
+   * @param featuresToCopy any number of features to copy to the new node.
+   *              The method will do a reference copy, not a deep copy.
    */
-  public INodeDirectory(INodeDirectory other, boolean adopt, boolean copyFeatures) {
+  public INodeDirectory(INodeDirectory other, boolean adopt,
+      Feature... featuresToCopy) {
     super(other);
     this.children = other.children;
     if (adopt && this.children != null) {
@@ -86,9 +89,7 @@ public class INodeDirectory extends INod
         child.setParent(this);
       }
     }
-    if (copyFeatures) {
-      this.features = other.features;
-    }
+    this.features = featuresToCopy;
   }
 
   /** @return true unconditionally. */
@@ -231,7 +232,8 @@ public class INodeDirectory extends INod
   public INodeDirectory replaceSelf4INodeDirectory(final INodeMap inodeMap) {
     Preconditions.checkState(getClass() != INodeDirectory.class,
         "the class is already INodeDirectory, this=%s", this);
-    return replaceSelf(new INodeDirectory(this, true, true), inodeMap);
+    return replaceSelf(new INodeDirectory(this, true, this.getFeatures()),
+      inodeMap);
   }
 
   /** Replace itself with the given directory. */

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java Fri Jan 31 22:21:19 2014
@@ -35,8 +35,8 @@ public interface INodeDirectoryAttribute
   public static class SnapshotCopy extends INodeAttributes.SnapshotCopy
       implements INodeDirectoryAttributes {
     public SnapshotCopy(byte[] name, PermissionStatus permissions,
-        long modificationTime) {
-      super(name, permissions, modificationTime, 0L);
+        AclFeature aclFeature, long modificationTime) {
+      super(name, permissions, aclFeature, modificationTime, 0L);
     }
 
     public SnapshotCopy(INodeDirectory dir) {
@@ -62,8 +62,9 @@ public interface INodeDirectoryAttribute
 
 
     public CopyWithQuota(byte[] name, PermissionStatus permissions,
-        long modificationTime, long nsQuota, long dsQuota) {
-      super(name, permissions, modificationTime);
+        AclFeature aclFeature, long modificationTime, long nsQuota,
+        long dsQuota) {
+      super(name, permissions, aclFeature, modificationTime);
       this.nsQuota = nsQuota;
       this.dsQuota = dsQuota;
     }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java Fri Jan 31 22:21:19 2014
@@ -41,9 +41,9 @@ public interface INodeFileAttributes ext
     private final long header;
 
     public SnapshotCopy(byte[] name, PermissionStatus permissions,
-        long modificationTime, long accessTime,
+        AclFeature aclFeature, long modificationTime, long accessTime,
         short replication, long preferredBlockSize) {
-      super(name, permissions, modificationTime, accessTime);
+      super(name, permissions, aclFeature, modificationTime, accessTime);
 
       final long h = HeaderFormat.combineReplication(0L, replication);
       header = HeaderFormat.combinePreferredBlockSize(h, preferredBlockSize);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Fri Jan 31 22:21:19 2014
@@ -213,6 +213,22 @@ public abstract class INodeReference ext
   public final FsPermission getFsPermission(int snapshotId) {
     return referred.getFsPermission(snapshotId);
   }
+
+  @Override
+  final AclFeature getAclFeature(int snapshotId) {
+    return referred.getAclFeature(snapshotId);
+  }
+
+  @Override
+  final void addAclFeature(AclFeature aclFeature) {
+    referred.addAclFeature(aclFeature);
+  }
+
+  @Override
+  final void removeAclFeature() {
+    referred.removeAclFeature();
+  }
+
   @Override
   public final short getFsPermissionShort() {
     return referred.getFsPermissionShort();

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java Fri Jan 31 22:21:19 2014
@@ -221,6 +221,15 @@ public abstract class INodeWithAdditiona
   }
 
   @Override
+  final AclFeature getAclFeature(int snapshotId) {
+    if (snapshotId != Snapshot.CURRENT_STATE_ID) {
+      return getSnapshotINode(snapshotId).getAclFeature();
+    }
+
+    return getFeature(AclFeature.class);
+  }
+
+  @Override
   final long getModificationTime(int snapshotId) {
     if (snapshotId != Snapshot.CURRENT_STATE_ID) {
       return getSnapshotINode(snapshotId).getModificationTime();
@@ -318,10 +327,6 @@ public abstract class INodeWithAdditiona
     return null;
   }
 
-  public AclFeature getAclFeature() {
-    return getFeature(AclFeature.class);
-  }
-
   public void removeAclFeature() {
     AclFeature f = getAclFeature();
     Preconditions.checkNotNull(f);
@@ -335,4 +340,8 @@ public abstract class INodeWithAdditiona
 
     addFeature(f);
   }
+
+  public final Feature[] getFeatures() {
+    return features;
+  }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Fri Jan 31 22:21:19 2014
@@ -184,7 +184,7 @@ public class INodeDirectorySnapshottable
   private int snapshotQuota = SNAPSHOT_LIMIT;
 
   public INodeDirectorySnapshottable(INodeDirectory dir) {
-    super(dir, true, true);
+    super(dir, true, dir.getFeatures());
     // add snapshot feature if the original directory does not have it
     if (!isWithSnapshot()) {
       addSnapshotFeature(null);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java Fri Jan 31 22:21:19 2014
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Date;
 
@@ -28,12 +29,16 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.AclFeature;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 /** Snapshot of a sub-tree in the namesystem. */
 @InterfaceAudience.Private
 public class Snapshot implements Comparable<byte[]> {
@@ -139,7 +144,10 @@ public class Snapshot implements Compara
   /** The root directory of the snapshot. */
   static public class Root extends INodeDirectory {
     Root(INodeDirectory other) {
-      super(other, false, false);
+      // Always preserve ACL.
+      super(other, false, Lists.newArrayList(
+        Iterables.filter(Arrays.asList(other.getFeatures()), AclFeature.class))
+        .toArray(new Feature[0]));
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java Fri Jan 31 22:21:19 2014
@@ -94,6 +94,13 @@ public class TestFSImageWithAcl {
     s = cluster.getNamesystem().getAclStatus(p.toString());
     returned = Lists.newArrayList(s.getEntries()).toArray(new AclEntry[0]);
     Assert.assertArrayEquals(new AclEntry[] { }, returned);
+
+    fs.modifyAclEntries(p, Lists.newArrayList(e));
+    s = cluster.getNamesystem().getAclStatus(p.toString());
+    returned = Lists.newArrayList(s.getEntries()).toArray(new AclEntry[0]);
+    Assert.assertArrayEquals(new AclEntry[] {
+        aclEntry(ACCESS, USER, "foo", READ_EXECUTE),
+        aclEntry(ACCESS, GROUP, READ) }, returned);
   }
 
   @Test

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java Fri Jan 31 22:21:19 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.permission.A
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.AclException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -797,8 +798,7 @@ public class TestNameNodeAcl {
     INode inode = cluster.getNamesystem().getFSDirectory().getRoot()
       .getNode(path.toUri().getPath(), false);
     assertNotNull(inode);
-    assertTrue(inode instanceof INodeWithAdditionalFields);
-    AclFeature aclFeature = ((INodeWithAdditionalFields)inode).getAclFeature();
+    AclFeature aclFeature = inode.getAclFeature(Snapshot.CURRENT_STATE_ID);
     if (expectAclFeature) {
       assertNotNull(aclFeature);
     } else {

Added: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java?rev=1563304&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java (added)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java Fri Jan 31 22:21:19 2014
@@ -0,0 +1,776 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.snapshot;
+
+import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.*;
+import static org.apache.hadoop.fs.permission.AclEntryScope.*;
+import static org.apache.hadoop.fs.permission.AclEntryType.*;
+import static org.apache.hadoop.fs.permission.FsAction.*;
+import static org.junit.Assert.*;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests interaction of ACLs with snapshots.
+ */
+public class TestAclWithSnapshot {
+  private static final UserGroupInformation BRUCE =
+    UserGroupInformation.createUserForTesting("bruce", new String[] { });
+  private static final UserGroupInformation DIANA =
+    UserGroupInformation.createUserForTesting("diana", new String[] { });
+
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+  private static FileSystem fsAsBruce, fsAsDiana;
+  private static DistributedFileSystem hdfs;
+  private static int pathCount = 0;
+  private static Path path, snapshotPath;
+  private static String snapshotName;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new Configuration();
+    initCluster(true);
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    IOUtils.cleanup(null, hdfs, fsAsBruce, fsAsDiana);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() {
+    ++pathCount;
+    path = new Path("/p" + pathCount);
+    snapshotName = "snapshot" + pathCount;
+    snapshotPath = new Path(path, new Path(".snapshot", snapshotName));
+  }
+
+  @Test
+  public void testOriginalAclEnforcedForSnapshotRootAfterChange()
+      throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, ALL),
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, OTHER, NONE));
+    hdfs.setAcl(path, aclSpec);
+
+    assertDirPermissionGranted(fsAsBruce, BRUCE, path);
+    assertDirPermissionDenied(fsAsDiana, DIANA, path);
+
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+
+    // Both original and snapshot still have same ACL.
+    AclStatus s = hdfs.getAclStatus(path);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02750, path);
+
+    s = hdfs.getAclStatus(snapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02750, snapshotPath);
+
+    assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
+
+    aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, READ_EXECUTE),
+      aclEntry(ACCESS, USER, "diana", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, OTHER, NONE));
+    hdfs.setAcl(path, aclSpec);
+
+    // Original has changed, but snapshot still has old ACL.
+    doSnapshotRootChangeAssertions(path, snapshotPath);
+    restart(false);
+    doSnapshotRootChangeAssertions(path, snapshotPath);
+    restart(true);
+    doSnapshotRootChangeAssertions(path, snapshotPath);
+  }
+
+  private static void doSnapshotRootChangeAssertions(Path path,
+      Path snapshotPath) throws Exception {
+    AclStatus s = hdfs.getAclStatus(path);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "diana", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02550, path);
+
+    s = hdfs.getAclStatus(snapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02750, snapshotPath);
+
+    assertDirPermissionDenied(fsAsBruce, BRUCE, path);
+    assertDirPermissionGranted(fsAsDiana, DIANA, path);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
+  }
+
+  @Test
+  public void testOriginalAclEnforcedForSnapshotContentsAfterChange()
+      throws Exception {
+    Path filePath = new Path(path, "file1");
+    Path subdirPath = new Path(path, "subdir1");
+    Path fileSnapshotPath = new Path(snapshotPath, "file1");
+    Path subdirSnapshotPath = new Path(snapshotPath, "subdir1");
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0777));
+    FileSystem.create(hdfs, filePath, FsPermission.createImmutable((short)0600))
+      .close();
+    FileSystem.mkdirs(hdfs, subdirPath, FsPermission.createImmutable(
+      (short)0700));
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, READ_EXECUTE),
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, OTHER, NONE));
+    hdfs.setAcl(filePath, aclSpec);
+    hdfs.setAcl(subdirPath, aclSpec);
+
+    assertFilePermissionGranted(fsAsBruce, BRUCE, filePath);
+    assertFilePermissionDenied(fsAsDiana, DIANA, filePath);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, subdirPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, subdirPath);
+
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+
+    // Both original and snapshot still have same ACL.
+    AclEntry[] expected = new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) };
+    AclStatus s = hdfs.getAclStatus(filePath);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, filePath);
+
+    s = hdfs.getAclStatus(subdirPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, subdirPath);
+
+    s = hdfs.getAclStatus(fileSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, fileSnapshotPath);
+    assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
+    assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
+
+    s = hdfs.getAclStatus(subdirSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, subdirSnapshotPath);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
+
+    aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, READ_EXECUTE),
+      aclEntry(ACCESS, USER, "diana", ALL),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, OTHER, NONE));
+    hdfs.setAcl(filePath, aclSpec);
+    hdfs.setAcl(subdirPath, aclSpec);
+
+    // Original has changed, but snapshot still has old ACL.
+    doSnapshotContentsChangeAssertions(filePath, fileSnapshotPath, subdirPath,
+      subdirSnapshotPath);
+    restart(false);
+    doSnapshotContentsChangeAssertions(filePath, fileSnapshotPath, subdirPath,
+      subdirSnapshotPath);
+    restart(true);
+    doSnapshotContentsChangeAssertions(filePath, fileSnapshotPath, subdirPath,
+      subdirSnapshotPath);
+  }
+
+  private static void doSnapshotContentsChangeAssertions(Path filePath,
+      Path fileSnapshotPath, Path subdirPath, Path subdirSnapshotPath)
+      throws Exception {
+    AclEntry[] expected = new AclEntry[] {
+      aclEntry(ACCESS, USER, "diana", ALL),
+      aclEntry(ACCESS, GROUP, NONE) };
+    AclStatus s = hdfs.getAclStatus(filePath);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02570, filePath);
+    assertFilePermissionDenied(fsAsBruce, BRUCE, filePath);
+    assertFilePermissionGranted(fsAsDiana, DIANA, filePath);
+
+    s = hdfs.getAclStatus(subdirPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02570, subdirPath);
+    assertDirPermissionDenied(fsAsBruce, BRUCE, subdirPath);
+    assertDirPermissionGranted(fsAsDiana, DIANA, subdirPath);
+
+    expected = new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) };
+    s = hdfs.getAclStatus(fileSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, fileSnapshotPath);
+    assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
+    assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
+
+    s = hdfs.getAclStatus(subdirSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, subdirSnapshotPath);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
+  }
+
+  @Test
+  public void testOriginalAclEnforcedForSnapshotRootAfterRemoval()
+      throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, ALL),
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, OTHER, NONE));
+    hdfs.setAcl(path, aclSpec);
+
+    assertDirPermissionGranted(fsAsBruce, BRUCE, path);
+    assertDirPermissionDenied(fsAsDiana, DIANA, path);
+
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+
+    // Both original and snapshot still have same ACL.
+    AclStatus s = hdfs.getAclStatus(path);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02750, path);
+
+    s = hdfs.getAclStatus(snapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02750, snapshotPath);
+
+    assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
+
+    hdfs.removeAcl(path);
+
+    // Original has changed, but snapshot still has old ACL.
+    doSnapshotRootRemovalAssertions(path, snapshotPath);
+    restart(false);
+    doSnapshotRootRemovalAssertions(path, snapshotPath);
+    restart(true);
+    doSnapshotRootRemovalAssertions(path, snapshotPath);
+  }
+
+  private static void doSnapshotRootRemovalAssertions(Path path,
+      Path snapshotPath) throws Exception {
+    AclStatus s = hdfs.getAclStatus(path);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] { }, returned);
+    assertPermission((short)0700, path);
+
+    s = hdfs.getAclStatus(snapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02750, snapshotPath);
+
+    assertDirPermissionDenied(fsAsBruce, BRUCE, path);
+    assertDirPermissionDenied(fsAsDiana, DIANA, path);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
+  }
+
+  @Test
+  public void testOriginalAclEnforcedForSnapshotContentsAfterRemoval()
+      throws Exception {
+    Path filePath = new Path(path, "file1");
+    Path subdirPath = new Path(path, "subdir1");
+    Path fileSnapshotPath = new Path(snapshotPath, "file1");
+    Path subdirSnapshotPath = new Path(snapshotPath, "subdir1");
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0777));
+    FileSystem.create(hdfs, filePath, FsPermission.createImmutable((short)0600))
+      .close();
+    FileSystem.mkdirs(hdfs, subdirPath, FsPermission.createImmutable(
+      (short)0700));
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, READ_EXECUTE),
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, OTHER, NONE));
+    hdfs.setAcl(filePath, aclSpec);
+    hdfs.setAcl(subdirPath, aclSpec);
+
+    assertFilePermissionGranted(fsAsBruce, BRUCE, filePath);
+    assertFilePermissionDenied(fsAsDiana, DIANA, filePath);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, subdirPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, subdirPath);
+
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+
+    // Both original and snapshot still have same ACL.
+    AclEntry[] expected = new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) };
+    AclStatus s = hdfs.getAclStatus(filePath);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, filePath);
+
+    s = hdfs.getAclStatus(subdirPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, subdirPath);
+
+    s = hdfs.getAclStatus(fileSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, fileSnapshotPath);
+    assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
+    assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
+
+    s = hdfs.getAclStatus(subdirSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, subdirSnapshotPath);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
+
+    hdfs.removeAcl(filePath);
+    hdfs.removeAcl(subdirPath);
+
+    // Original has changed, but snapshot still has old ACL.
+    doSnapshotContentsRemovalAssertions(filePath, fileSnapshotPath, subdirPath,
+      subdirSnapshotPath);
+    restart(false);
+    doSnapshotContentsRemovalAssertions(filePath, fileSnapshotPath, subdirPath,
+      subdirSnapshotPath);
+    restart(true);
+    doSnapshotContentsRemovalAssertions(filePath, fileSnapshotPath, subdirPath,
+      subdirSnapshotPath);
+  }
+
+  private static void doSnapshotContentsRemovalAssertions(Path filePath,
+      Path fileSnapshotPath, Path subdirPath, Path subdirSnapshotPath)
+      throws Exception {
+    AclEntry[] expected = new AclEntry[] { };
+    AclStatus s = hdfs.getAclStatus(filePath);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)0500, filePath);
+    assertFilePermissionDenied(fsAsBruce, BRUCE, filePath);
+    assertFilePermissionDenied(fsAsDiana, DIANA, filePath);
+
+    s = hdfs.getAclStatus(subdirPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)0500, subdirPath);
+    assertDirPermissionDenied(fsAsBruce, BRUCE, subdirPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, subdirPath);
+
+    expected = new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) };
+    s = hdfs.getAclStatus(fileSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, fileSnapshotPath);
+    assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
+    assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
+
+    s = hdfs.getAclStatus(subdirSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02550, subdirSnapshotPath);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
+    assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
+  }
+
+  @Test
+  public void testModifyReadsCurrentState() throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, "bruce", ALL));
+    hdfs.modifyAclEntries(path, aclSpec);
+
+    aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, "diana", READ_EXECUTE));
+    hdfs.modifyAclEntries(path, aclSpec);
+
+    AclEntry[] expected = new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", ALL),
+      aclEntry(ACCESS, USER, "diana", READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, NONE) };
+    AclStatus s = hdfs.getAclStatus(path);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)02770, path);
+    assertDirPermissionGranted(fsAsBruce, BRUCE, path);
+    assertDirPermissionGranted(fsAsDiana, DIANA, path);
+  }
+
+  @Test
+  public void testRemoveReadsCurrentState() throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, "bruce", ALL));
+    hdfs.modifyAclEntries(path, aclSpec);
+
+    hdfs.removeAcl(path);
+
+    AclEntry[] expected = new AclEntry[] { };
+    AclStatus s = hdfs.getAclStatus(path);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expected, returned);
+    assertPermission((short)0700, path);
+    assertDirPermissionDenied(fsAsBruce, BRUCE, path);
+    assertDirPermissionDenied(fsAsDiana, DIANA, path);
+  }
+
+  @Test
+  public void testDefaultAclNotCopiedToAccessAclOfNewSnapshot()
+      throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(DEFAULT, USER, "bruce", READ_EXECUTE));
+    hdfs.modifyAclEntries(path, aclSpec);
+
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+
+    AclStatus s = hdfs.getAclStatus(path);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(DEFAULT, USER, ALL),
+      aclEntry(DEFAULT, USER, "bruce", READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, NONE),
+      aclEntry(DEFAULT, MASK, READ_EXECUTE),
+      aclEntry(DEFAULT, OTHER, NONE) }, returned);
+    assertPermission((short)02700, path);
+
+    s = hdfs.getAclStatus(snapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(DEFAULT, USER, ALL),
+      aclEntry(DEFAULT, USER, "bruce", READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, NONE),
+      aclEntry(DEFAULT, MASK, READ_EXECUTE),
+      aclEntry(DEFAULT, OTHER, NONE) }, returned);
+    assertPermission((short)02700, snapshotPath);
+
+    assertDirPermissionDenied(fsAsBruce, BRUCE, snapshotPath);
+  }
+
+  @Test
+  public void testModifyAclEntriesSnapshotPath() throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(DEFAULT, USER, "bruce", READ_EXECUTE));
+    exception.expect(SnapshotAccessControlException.class);
+    hdfs.modifyAclEntries(snapshotPath, aclSpec);
+  }
+
+  @Test
+  public void testRemoveAclEntriesSnapshotPath() throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(DEFAULT, USER, "bruce"));
+    exception.expect(SnapshotAccessControlException.class);
+    hdfs.removeAclEntries(snapshotPath, aclSpec);
+  }
+
+  @Test
+  public void testRemoveDefaultAclSnapshotPath() throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+    exception.expect(SnapshotAccessControlException.class);
+    hdfs.removeDefaultAcl(snapshotPath);
+  }
+
+  @Test
+  public void testRemoveAclSnapshotPath() throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+    exception.expect(SnapshotAccessControlException.class);
+    hdfs.removeAcl(snapshotPath);
+  }
+
+  @Test
+  public void testSetAclSnapshotPath() throws Exception {
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0700));
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(DEFAULT, USER, "bruce"));
+    exception.expect(SnapshotAccessControlException.class);
+    hdfs.setAcl(snapshotPath, aclSpec);
+  }
+
+  @Test
+  public void testChangeAclExceedsQuota() throws Exception {
+    Path filePath = new Path(path, "file1");
+    Path fileSnapshotPath = new Path(snapshotPath, "file1");
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0755));
+    hdfs.allowSnapshot(path);
+    hdfs.setQuota(path, 3, HdfsConstants.QUOTA_DONT_SET);
+    FileSystem.create(hdfs, filePath, FsPermission.createImmutable((short)0600))
+      .close();
+    hdfs.setPermission(filePath, FsPermission.createImmutable((short)0600));
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, "bruce", READ_WRITE));
+    hdfs.modifyAclEntries(filePath, aclSpec);
+
+    hdfs.createSnapshot(path, snapshotName);
+
+    AclStatus s = hdfs.getAclStatus(filePath);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_WRITE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02660, filePath);
+
+    s = hdfs.getAclStatus(fileSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_WRITE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02660, filePath);
+
+    aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, "bruce", READ));
+    exception.expect(NSQuotaExceededException.class);
+    hdfs.modifyAclEntries(filePath, aclSpec);
+  }
+
+  @Test
+  public void testRemoveAclExceedsQuota() throws Exception {
+    Path filePath = new Path(path, "file1");
+    Path fileSnapshotPath = new Path(snapshotPath, "file1");
+    FileSystem.mkdirs(hdfs, path, FsPermission.createImmutable((short)0755));
+    hdfs.allowSnapshot(path);
+    hdfs.setQuota(path, 3, HdfsConstants.QUOTA_DONT_SET);
+    FileSystem.create(hdfs, filePath, FsPermission.createImmutable((short)0600))
+      .close();
+    hdfs.setPermission(filePath, FsPermission.createImmutable((short)0600));
+    List<AclEntry> aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, "bruce", READ_WRITE));
+    hdfs.modifyAclEntries(filePath, aclSpec);
+
+    hdfs.createSnapshot(path, snapshotName);
+
+    AclStatus s = hdfs.getAclStatus(filePath);
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_WRITE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02660, filePath);
+
+    s = hdfs.getAclStatus(fileSnapshotPath);
+    returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] {
+      aclEntry(ACCESS, USER, "bruce", READ_WRITE),
+      aclEntry(ACCESS, GROUP, NONE) }, returned);
+    assertPermission((short)02660, filePath);
+
+    aclSpec = Lists.newArrayList(
+      aclEntry(ACCESS, USER, "bruce", READ));
+    exception.expect(NSQuotaExceededException.class);
+    hdfs.removeAcl(filePath);
+  }
+
+  @Test
+  public void testGetAclStatusDotSnapshotPath() throws Exception {
+    hdfs.mkdirs(path);
+    SnapshotTestHelper.createSnapshot(hdfs, path, snapshotName);
+    AclStatus s = hdfs.getAclStatus(new Path(path, ".snapshot"));
+    AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(new AclEntry[] { }, returned);
+  }
+
+  /**
+   * Asserts that permission is denied to the given fs/user for the given
+   * directory.
+   *
+   * @param fs FileSystem to check
+   * @param user UserGroupInformation owner of fs
+   * @param pathToCheck Path directory to check
+   * @throws Exception if there is an unexpected error
+   */
+  private static void assertDirPermissionDenied(FileSystem fs,
+      UserGroupInformation user, Path pathToCheck) throws Exception {
+    try {
+      fs.listStatus(pathToCheck);
+      fail("expected AccessControlException for user " + user + ", path = " +
+        pathToCheck);
+    } catch (AccessControlException e) {
+      // expected
+    }
+  }
+
+  /**
+   * Asserts that permission is granted to the given fs/user for the given
+   * directory.
+   *
+   * @param fs FileSystem to check
+   * @param user UserGroupInformation owner of fs
+   * @param pathToCheck Path directory to check
+   * @throws Exception if there is an unexpected error
+   */
+  private static void assertDirPermissionGranted(FileSystem fs,
+      UserGroupInformation user, Path pathToCheck) throws Exception {
+    try {
+      fs.listStatus(pathToCheck);
+    } catch (AccessControlException e) {
+      fail("expected permission granted for user " + user + ", path = " +
+        pathToCheck);
+    }
+  }
+
+  /**
+   * Asserts that permission is denied to the given fs/user for the given file.
+   *
+   * @param fs FileSystem to check
+   * @param user UserGroupInformation owner of fs
+   * @param pathToCheck Path file to check
+   * @throws Exception if there is an unexpected error
+   */
+  private static void assertFilePermissionDenied(FileSystem fs,
+      UserGroupInformation user, Path pathToCheck) throws Exception {
+    try {
+      fs.open(pathToCheck).close();
+      fail("expected AccessControlException for user " + user + ", path = " +
+        pathToCheck);
+    } catch (AccessControlException e) {
+      // expected
+    }
+  }
+
+  /**
+   * Asserts that permission is granted to the given fs/user for the given file.
+   *
+   * @param fs FileSystem to check
+   * @param user UserGroupInformation owner of fs
+   * @param pathToCheck Path file to check
+   * @throws Exception if there is an unexpected error
+   */
+  private static void assertFilePermissionGranted(FileSystem fs,
+      UserGroupInformation user, Path pathToCheck) throws Exception {
+    try {
+      fs.open(pathToCheck).close();
+    } catch (AccessControlException e) {
+      fail("expected permission granted for user " + user + ", path = " +
+        pathToCheck);
+    }
+  }
+
+  /**
+   * Asserts the value of the FsPermission bits on the inode of the test path.
+   *
+   * @param perm short expected permission bits
+   * @param pathToCheck Path to check
+   * @throws Exception thrown if there is an unexpected error
+   */
+  private static void assertPermission(short perm, Path pathToCheck)
+      throws Exception {
+    assertEquals(FsPermission.createImmutable(perm),
+      hdfs.getFileStatus(pathToCheck).getPermission());
+  }
+
+  /**
+   * Initialize the cluster, wait for it to become active, and get FileSystem
+   * instances for our test users.
+   *
+   * @param format if true, format the NameNode and DataNodes before starting up
+   * @throws Exception if any step fails
+   */
+  private static void initCluster(boolean format) throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(format)
+      .build();
+    cluster.waitActive();
+    hdfs = cluster.getFileSystem();
+    fsAsBruce = DFSTestUtil.getFileSystemAs(BRUCE, conf);
+    fsAsDiana = DFSTestUtil.getFileSystemAs(DIANA, conf);
+  }
+
+  /**
+   * Restart the cluster, optionally saving a new checkpoint.
+   *
+   * @param checkpoint boolean true to save a new checkpoint
+   * @throws Exception if restart fails
+   */
+  private static void restart(boolean checkpoint) throws Exception {
+    NameNode nameNode = cluster.getNameNode();
+    if (checkpoint) {
+      NameNodeAdapter.enterSafeMode(nameNode, false);
+      NameNodeAdapter.saveNamespace(nameNode);
+    }
+    shutdown();
+    initCluster(false);
+  }
+}

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDiff.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDiff.java?rev=1563304&r1=1563303&r2=1563304&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDiff.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDiff.java Fri Jan 31 22:21:19 2014
@@ -305,7 +305,8 @@ public class TestDiff {
     final int i = Diff.search(current, inode.getKey());
     Assert.assertTrue(i >= 0);
     final INodeDirectory oldinode = (INodeDirectory)current.get(i);
-    final INodeDirectory newinode = new INodeDirectory(oldinode, false, true);
+    final INodeDirectory newinode = new INodeDirectory(oldinode, false,
+      oldinode.getFeatures());
     newinode.setModificationTime(oldinode.getModificationTime() + 1);
 
     current.set(i, newinode);



Mime
View raw message