hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject svn commit: r1559267 [1/2] - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/ src/test/java/org/ap...
Date Fri, 17 Jan 2014 22:05:47 GMT
Author: wheat9
Date: Fri Jan 17 22:05:47 2014
New Revision: 1559267

URL: http://svn.apache.org/r1559267
Log:
HDFS-5758. NameNode: complete implementation of inode modifications for ACLs. Contributed by Chris Nauroth.

Added:
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ScopedAclEntries.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/AclTestHelpers.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
Modified:
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAclTransformation.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java
    hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeAcl.java

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt?rev=1559267&r1=1559266&r2=1559267&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4685.txt Fri Jan 17 22:05:47 2014
@@ -30,6 +30,9 @@ HDFS-4685 (Unreleased)
 
     HADOOP-10220. Add ACL indicator bit to FsPermission. (cnauroth)
 
+    HDFS-5758. NameNode: complete implementation of inode modifications for
+    ACLs. (Chris Nauroth via wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1559267&r1=1559266&r2=1559267&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jan 17 22:05:47 2014
@@ -111,6 +111,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
@@ -2648,10 +2649,11 @@ public class DFSClient implements java.i
       namenode.modifyAclEntries(src, aclSpec);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
+                                     AclException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class,
-                                     SnapshotAccessControlException.class);
+                                     SnapshotAccessControlException.class,
+                                     UnresolvedPathException.class);
     }
   }
 
@@ -2662,10 +2664,11 @@ public class DFSClient implements java.i
       namenode.removeAclEntries(src, aclSpec);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
+                                     AclException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class,
-                                     SnapshotAccessControlException.class);
+                                     SnapshotAccessControlException.class,
+                                     UnresolvedPathException.class);
     }
   }
 
@@ -2675,10 +2678,11 @@ public class DFSClient implements java.i
       namenode.removeDefaultAcl(src);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
+                                     AclException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class,
-                                     SnapshotAccessControlException.class);
+                                     SnapshotAccessControlException.class,
+                                     UnresolvedPathException.class);
     }
   }
 
@@ -2688,10 +2692,11 @@ public class DFSClient implements java.i
       namenode.removeAcl(src);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
+                                     AclException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class,
-                                     SnapshotAccessControlException.class);
+                                     SnapshotAccessControlException.class,
+                                     UnresolvedPathException.class);
     }
   }
 
@@ -2701,10 +2706,11 @@ public class DFSClient implements java.i
       namenode.setAcl(src, aclSpec);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
+                                     AclException.class,
                                      FileNotFoundException.class,
                                      SafeModeException.class,
-                                     UnresolvedPathException.class,
-                                     SnapshotAccessControlException.class);
+                                     SnapshotAccessControlException.class,
+                                     UnresolvedPathException.class);
     }
   }
 

Added: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java?rev=1559267&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java (added)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java Fri Jan 17 22:05:47 2014
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.AclException;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+
+/**
+ * AclStorage contains utility methods that define how ACL data is stored in the
+ * namespace.
+ *
+ * If an inode has an ACL, then the ACL bit is set in the inode's
+ * {@link FsPermission} and the inode also contains an {@link AclFeature}.  For
+ * the access ACL, the owner and other entries are identical to the owner and
+ * other bits stored in FsPermission, so we reuse those.  The access mask entry
+ * is stored into the group permission bits of FsPermission.  This is consistent
+ * with other file systems' implementations of ACLs and eliminates the need for
+ * special handling in various parts of the codebase.  For example, if a user
+ * calls chmod to change group permission bits on a file with an ACL, then the
+ * expected behavior is to change the ACL's mask entry.  By saving the mask entry
+ * into the group permission bits, chmod continues to work correctly without
+ * special handling.  All remaining access entries (named users and named groups)
+ * are stored as explicit {@link AclEntry} instances in a list inside the
+ * AclFeature.  Additionally, all default entries are stored in the AclFeature.
+ *
+ * The methods in this class encapsulate these rules for reading or writing the
+ * ACL entries to the appropriate location.
+ *
+ * The methods in this class assume that input ACL entry lists have already been
+ * validated and sorted according to the rules enforced by
+ * {@link AclTransformation}.
+ */
+@InterfaceAudience.Private
+final class AclStorage {
+
+  /**
+   * Reads the existing extended ACL entries of an inode.  This method returns
+   * only the extended ACL entries stored in the AclFeature.  If the inode does
+   * not have an ACL, then this method returns an empty list.
+   *
+   * @param inode INodeWithAdditionalFields to read
+   * @param snapshotId int latest snapshot ID of inode
+   * @return List<AclEntry> containing extended inode ACL entries
+   */
+  public static List<AclEntry> readINodeAcl(INodeWithAdditionalFields inode,
+      int snapshotId) {
+    FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
+    if (perm.getAclBit()) {
+      return inode.getAclFeature().getEntries();
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  /**
+   * Reads the existing ACL of an inode.  This method always returns the full
+   * logical ACL of the inode after reading relevant data from the inode's
+   * {@link FsPermission} and {@link AclFeature}.  Note that every inode
+   * logically has an ACL, even if no ACL has been set explicitly.  If the inode
+   * does not have an extended ACL, then the result is a minimal ACL consising of
+   * exactly 3 entries that correspond to the owner, group and other permissions.
+   *
+   * @param inode INodeWithAdditionalFields to read
+   * @param snapshotId int latest snapshot ID of inode
+   * @return List<AclEntry> containing all logical inode ACL entries
+   */
+  public static List<AclEntry> readINodeLogicalAcl(
+      INodeWithAdditionalFields inode, int snapshotId) {
+    final List<AclEntry> existingAcl;
+    FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
+    if (perm.getAclBit()) {
+      // Split ACL entries stored in the feature into access vs. default.
+      List<AclEntry> featureEntries = inode.getAclFeature().getEntries();
+      ScopedAclEntries scoped = new ScopedAclEntries(featureEntries);
+      List<AclEntry> accessEntries = scoped.getAccessEntries();
+      List<AclEntry> defaultEntries = scoped.getDefaultEntries();
+
+      // Pre-allocate list size for the explicit entries stored in the feature
+      // plus the 3 implicit entries (owner, group and other) from the permission
+      // bits.
+      existingAcl = Lists.newArrayListWithCapacity(featureEntries.size() + 3);
+
+      if (!accessEntries.isEmpty()) {
+        // Add owner entry implied from user permission bits.
+        existingAcl.add(new AclEntry.Builder()
+          .setScope(AclEntryScope.ACCESS)
+          .setType(AclEntryType.USER)
+          .setPermission(perm.getUserAction())
+          .build());
+
+        // Next add all named user and group entries taken from the feature.
+        existingAcl.addAll(accessEntries);
+
+        // Add mask entry implied from group permission bits.
+        existingAcl.add(new AclEntry.Builder()
+          .setScope(AclEntryScope.ACCESS)
+          .setType(AclEntryType.MASK)
+          .setPermission(perm.getGroupAction())
+          .build());
+
+        // Add other entry implied from other permission bits.
+        existingAcl.add(new AclEntry.Builder()
+          .setScope(AclEntryScope.ACCESS)
+          .setType(AclEntryType.OTHER)
+          .setPermission(perm.getOtherAction())
+          .build());
+      } else {
+        // It's possible that there is a default ACL but no access ACL.  In this
+        // case, add the minimal access ACL implied by the permission bits.
+        existingAcl.addAll(getMinimalAcl(perm));
+      }
+
+      // Add all default entries after the access entries.
+      existingAcl.addAll(defaultEntries);
+    } else {
+      // If the inode doesn't have an extended ACL, then return a minimal ACL.
+      existingAcl = getMinimalAcl(perm);
+    }
+
+    // The above adds entries in the correct order, so no need to sort here.
+    return existingAcl;
+  }
+
+  /**
+   * Completely removes the ACL from an inode.
+   *
+   * @param inode INodeWithAdditionalFields to update
+   * @param snapshotId int latest snapshot ID of inode
+   * @throws QuotaExceededException if quota limit is exceeded
+   */
+  public static void removeINodeAcl(INodeWithAdditionalFields inode,
+      int snapshotId) throws QuotaExceededException {
+    FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
+    if (perm.getAclBit()) {
+      // Restore group permissions from the feature's entry to permission bits,
+      // overwriting the mask, which is not part of a minimal ACL.
+      List<AclEntry> featureEntries = inode.getAclFeature().getEntries();
+      AclEntry groupEntryKey = new AclEntry.Builder()
+        .setScope(AclEntryScope.ACCESS)
+        .setType(AclEntryType.GROUP)
+        .build();
+      int groupEntryIndex = Collections.binarySearch(featureEntries,
+        groupEntryKey, AclTransformation.ACL_ENTRY_COMPARATOR);
+      assert groupEntryIndex >= 0;
+
+      // Remove the feature and turn off the ACL bit.
+      inode.removeAclFeature();
+      FsPermission newPerm = new FsPermission(perm.getUserAction(),
+        featureEntries.get(groupEntryIndex).getPermission(),
+        perm.getOtherAction(),
+        perm.getStickyBit(), false);
+      inode.setPermission(newPerm, snapshotId);
+    }
+  }
+
+  /**
+   * Updates an inode with a new ACL.  This method takes a full logical ACL and
+   * stores the entries to the inode's {@link FsPermission} and
+   * {@link AclFeature}.
+   *
+   * @param inode INodeWithAdditionalFields to update
+   * @param newAcl List<AclEntry> containing new ACL entries
+   * @param snapshotId int latest snapshot ID of inode
+   * @throws AclException if the ACL is invalid for the given inode
+   * @throws QuotaExceededException if quota limit is exceeded
+   */
+  public static void updateINodeAcl(INodeWithAdditionalFields inode,
+      List<AclEntry> newAcl, int snapshotId) throws AclException,
+      QuotaExceededException {
+    assert newAcl.size() >= 3;
+    FsPermission perm = inode.getPermissionStatus(snapshotId).getPermission();
+    final FsPermission newPerm;
+    if (newAcl.size() > 3) {
+      // This is an extended ACL.  Split entries into access vs. default.
+      ScopedAclEntries scoped = new ScopedAclEntries(newAcl);
+      List<AclEntry> accessEntries = scoped.getAccessEntries();
+      List<AclEntry> defaultEntries = scoped.getDefaultEntries();
+
+      // Only directories may have a default ACL.
+      if (!defaultEntries.isEmpty() && !inode.isDirectory()) {
+        throw new AclException(
+          "Invalid ACL: only directories may have a default ACL.");
+      }
+
+      // Pre-allocate list size for the explicit entries stored in the feature,
+      // which is all entries minus the 3 entries implicitly stored in the
+      // permission bits.
+      List<AclEntry> featureEntries = Lists.newArrayListWithCapacity(
+        (accessEntries.size() - 3) + defaultEntries.size());
+
+      // Calculate new permission bits.  For a correctly sorted ACL, the first
+      // entry is the owner and the last 2 entries are the mask and other entries
+      // respectively.  Also preserve sticky bit and toggle ACL bit on.
+      newPerm = new FsPermission(accessEntries.get(0).getPermission(),
+        accessEntries.get(accessEntries.size() - 2).getPermission(),
+        accessEntries.get(accessEntries.size() - 1).getPermission(),
+        perm.getStickyBit(), true);
+
+      // For the access ACL, the feature only needs to hold the named user and
+      // group entries.  For a correctly sorted ACL, these will be in a
+      // predictable range.
+      if (accessEntries.size() > 3) {
+        featureEntries.addAll(
+          accessEntries.subList(1, accessEntries.size() - 2));
+      }
+
+      // Add all default entries to the feature.
+      featureEntries.addAll(defaultEntries);
+
+      // Attach entries to the feature, creating a new feature if needed.
+      AclFeature aclFeature = inode.getAclFeature();
+      if (aclFeature == null) {
+        aclFeature = new AclFeature();
+        inode.addAclFeature(aclFeature);
+      }
+      aclFeature.setEntries(featureEntries);
+    } else {
+      // This is a minimal ACL.  Remove the ACL feature if it previously had one.
+      if (perm.getAclBit()) {
+        inode.removeAclFeature();
+      }
+
+      // Calculate new permission bits.  For a correctly sorted ACL, the owner,
+      // group and other permissions are in order.  Also preserve sticky bit and
+      // toggle ACL bit off.
+      newPerm = new FsPermission(newAcl.get(0).getPermission(),
+        newAcl.get(1).getPermission(),
+        newAcl.get(2).getPermission(),
+        perm.getStickyBit(), false);
+    }
+
+    inode.setPermission(newPerm, snapshotId);
+  }
+
+  /**
+   * There is no reason to instantiate this class.
+   */
+  private AclStorage() {
+  }
+
+  /**
+   * Translates the given permission bits to the equivalent minimal ACL.
+   *
+   * @param perm FsPermission to translate
+   * @return List<AclEntry> containing exactly 3 entries representing the owner,
+   *   group and other permissions
+   */
+  private static List<AclEntry> getMinimalAcl(FsPermission perm) {
+    return Lists.newArrayList(
+      new AclEntry.Builder()
+        .setScope(AclEntryScope.ACCESS)
+        .setType(AclEntryType.USER)
+        .setPermission(perm.getUserAction())
+        .build(),
+      new AclEntry.Builder()
+        .setScope(AclEntryScope.ACCESS)
+        .setType(AclEntryType.GROUP)
+        .setPermission(perm.getGroupAction())
+        .build(),
+      new AclEntry.Builder()
+        .setScope(AclEntryScope.ACCESS)
+        .setType(AclEntryType.OTHER)
+        .setPermission(perm.getOtherAction())
+        .build());
+  }
+}

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java?rev=1559267&r1=1559266&r2=1559267&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java Fri Jan 17 22:05:47 2014
@@ -62,7 +62,6 @@ import org.apache.hadoop.hdfs.protocol.A
 @InterfaceAudience.Private
 final class AclTransformation {
   private static final int MAX_ENTRIES = 32;
-  private static final int PIVOT_NOT_FOUND = -1;
 
   /**
    * Filters (discards) any existing ACL entries that have the same scope, type
@@ -246,7 +245,7 @@ final class AclTransformation {
    * -other entry
    * All access ACL entries sort ahead of all default ACL entries.
    */
-  private static final Comparator<AclEntry> ACL_ENTRY_COMPARATOR =
+  static final Comparator<AclEntry> ACL_ENTRY_COMPARATOR =
     new Comparator<AclEntry>() {
       @Override
       public int compare(AclEntry entry1, AclEntry entry2) {
@@ -294,20 +293,20 @@ final class AclTransformation {
     }
     // Search for the required base access entries.  If there is a default ACL,
     // then do the same check on the default entries.
-    int pivot = calculatePivotOnDefaultEntries(aclBuilder);
+    ScopedAclEntries scopedEntries = new ScopedAclEntries(aclBuilder);
     for (AclEntryType type: EnumSet.of(USER, GROUP, OTHER)) {
       AclEntry accessEntryKey = new AclEntry.Builder().setScope(ACCESS)
         .setType(type).build();
-      if (Collections.binarySearch(aclBuilder, accessEntryKey,
-          ACL_ENTRY_COMPARATOR) < 0) {
+      if (Collections.binarySearch(scopedEntries.getAccessEntries(),
+          accessEntryKey, ACL_ENTRY_COMPARATOR) < 0) {
         throw new AclException(
           "Invalid ACL: the user, group and other entries are required.");
       }
-      if (pivot != PIVOT_NOT_FOUND) {
+      if (!scopedEntries.getDefaultEntries().isEmpty()) {
         AclEntry defaultEntryKey = new AclEntry.Builder().setScope(DEFAULT)
           .setType(type).build();
-        if (Collections.binarySearch(aclBuilder, defaultEntryKey,
-            ACL_ENTRY_COMPARATOR) < 0) {
+        if (Collections.binarySearch(scopedEntries.getDefaultEntries(),
+            defaultEntryKey, ACL_ENTRY_COMPARATOR) < 0) {
           throw new AclException(
             "Invalid default ACL: the user, group and other entries are required.");
         }
@@ -364,12 +363,17 @@ final class AclTransformation {
     for (AclEntryScope scope: scopeFound) {
       if (!providedMask.containsKey(scope) && maskNeeded.contains(scope) &&
           maskDirty.contains(scope)) {
+        // Caller explicitly removed mask entry, but it's required.
         throw new AclException(
           "Invalid ACL: mask is required, but it was deleted.");
       } else if (providedMask.containsKey(scope) &&
           (!scopeDirty.contains(scope) || maskDirty.contains(scope))) {
+        // Caller explicitly provided new mask, or we are preserving the existing
+        // mask in an unchanged scope.
         aclBuilder.add(providedMask.get(scope));
-      } else if (maskNeeded.contains(scope)) {
+      } else if (maskNeeded.contains(scope) || providedMask.containsKey(scope)) {
+        // Otherwise, if there are maskable entries present, or the ACL
+        // previously had a mask, then recalculate a mask automatically.
         aclBuilder.add(new AclEntry.Builder()
           .setScope(scope)
           .setType(MASK)
@@ -380,23 +384,6 @@ final class AclTransformation {
   }
 
   /**
-   * Returns the pivot point in the list between the access entries and the
-   * default entries.  This is the index of the first element in the list that is
-   * a default entry.
-   *
-   * @param aclBuilder ArrayList<AclEntry> containing entries to build
-   * @return int pivot point, or -1 if list contains no default entries
-   */
-  private static int calculatePivotOnDefaultEntries(List<AclEntry> aclBuilder) {
-    for (int i = 0; i < aclBuilder.size(); ++i) {
-      if (aclBuilder.get(i).getScope() == DEFAULT) {
-        return i;
-      }
-    }
-    return PIVOT_NOT_FOUND;
-  }
-
-  /**
    * Adds unspecified default entries by copying permissions from the
    * corresponding access entries.
    *
@@ -404,11 +391,10 @@ final class AclTransformation {
    */
   private static void copyDefaultsIfNeeded(List<AclEntry> aclBuilder) {
     Collections.sort(aclBuilder, ACL_ENTRY_COMPARATOR);
-    int pivot = calculatePivotOnDefaultEntries(aclBuilder);
-    if (pivot != PIVOT_NOT_FOUND) {
-      List<AclEntry> accessEntries = aclBuilder.subList(0, pivot);
-      List<AclEntry> defaultEntries = aclBuilder.subList(pivot,
-        aclBuilder.size());
+    ScopedAclEntries scopedEntries = new ScopedAclEntries(aclBuilder);
+    if (!scopedEntries.getDefaultEntries().isEmpty()) {
+      List<AclEntry> accessEntries = scopedEntries.getAccessEntries();
+      List<AclEntry> defaultEntries = scopedEntries.getDefaultEntries();
       List<AclEntry> copiedEntries = Lists.newArrayListWithCapacity(3);
       for (AclEntryType type: EnumSet.of(USER, GROUP, OTHER)) {
         AclEntry defaultEntryKey = new AclEntry.Builder().setScope(DEFAULT)

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1559267&r1=1559266&r2=1559267&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Jan 17 22:05:47 2014
@@ -1151,7 +1151,18 @@ public class FSDirectory implements Clos
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
-    inode.setPermission(permissions, inodesInPath.getLatestSnapshotId());
+    int snapshotId = inodesInPath.getLatestSnapshotId();
+    FsPermission oldPerm = inode.getPermissionStatus(snapshotId).getPermission();
+    // This method cannot toggle the ACL bit.
+    if (oldPerm.getAclBit() != permissions.getAclBit()) {
+      permissions = new FsPermission(
+        permissions.getUserAction(),
+        permissions.getGroupAction(),
+        permissions.getOtherAction(),
+        permissions.getStickyBit(),
+        oldPerm.getAclBit());
+    }
+    inode.setPermission(permissions, snapshotId);
   }
 
   void setOwner(String src, String username, String groupname)
@@ -2631,6 +2642,78 @@ public class FSDirectory implements Clos
     return addINode(path, symlink) ? symlink : null;
   }
 
+  void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+    writeLock();
+    try {
+      List<AclEntry> newAcl = unprotectedModifyAclEntries(src, aclSpec);
+      fsImage.getEditLog().logSetAcl(src, newAcl);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  private List<AclEntry> unprotectedModifyAclEntries(String src,
+      List<AclEntry> aclSpec) throws IOException {
+    assert hasWriteLock();
+    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
+      snapshotId);
+    List<AclEntry> newAcl = AclTransformation.mergeAclEntries(existingAcl,
+      aclSpec);
+    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+    return newAcl;
+  }
+
+  void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+    writeLock();
+    try {
+      List<AclEntry> newAcl = unprotectedRemoveAclEntries(src, aclSpec);
+      fsImage.getEditLog().logSetAcl(src, newAcl);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  private List<AclEntry> unprotectedRemoveAclEntries(String src,
+      List<AclEntry> aclSpec) throws IOException {
+    assert hasWriteLock();
+    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
+      snapshotId);
+    List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
+      existingAcl, aclSpec);
+    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+    return newAcl;
+  }
+
+  void removeDefaultAcl(String src) throws IOException {
+    writeLock();
+    try {
+      List<AclEntry> newAcl = unprotectedRemoveDefaultAcl(src);
+      fsImage.getEditLog().logSetAcl(src, newAcl);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  private List<AclEntry> unprotectedRemoveDefaultAcl(String src)
+      throws IOException {
+    assert hasWriteLock();
+    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
+      snapshotId);
+    List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
+      existingAcl);
+    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+    return newAcl;
+  }
+
   void removeAcl(String src) throws IOException {
     writeLock();
     try {
@@ -2641,74 +2724,67 @@ public class FSDirectory implements Clos
     }
   }
 
-  private void unprotectedRemoveAcl(String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException, FileNotFoundException {
+  private void unprotectedRemoveAcl(String src) throws IOException {
     assert hasWriteLock();
-    final INodeWithAdditionalFields node = resolveINodeWithAdditionalField(src);
-    AclFeature f = node.getAclFeature();
-    if (f != null)
-      node.removeAclFeature();
+    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    AclStorage.removeINodeAcl(inode, snapshotId);
   }
 
   void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
     writeLock();
     try {
-      unprotectedSetAcl(src, aclSpec);
-      fsImage.getEditLog().logSetAcl(src, aclSpec);
+      List<AclEntry> newAcl = unprotectedSetAcl(src, aclSpec);
+      fsImage.getEditLog().logSetAcl(src, newAcl);
     } finally {
       writeUnlock();
     }
   }
 
-  void unprotectedSetAcl(String src, List<AclEntry> aclSpec)
-      throws UnresolvedLinkException, SnapshotAccessControlException,
-      FileNotFoundException {
-    assert hasWriteLock();
-    final INodeWithAdditionalFields node = resolveINodeWithAdditionalField(src);
-    AclFeature f = node.getAclFeature();
-
-    if (aclSpec.size() == 0) {
-      if (f != null)
-        node.removeAclFeature();
-      return;
+  List<AclEntry> unprotectedSetAcl(String src, List<AclEntry> aclSpec)
+      throws IOException {
+    // ACL removal is logged to edits as OP_SET_ACL with an empty list.
+    if (aclSpec.isEmpty()) {
+      unprotectedRemoveAcl(src);
+      return AclFeature.EMPTY_ENTRY_LIST;
     }
 
-    if (f == null) {
-      f = new AclFeature();
-      node.addAclFeature(f);
-    }
-    f.setEntries(aclSpec);
+    assert hasWriteLock();
+    INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+    INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(src, iip);
+    int snapshotId = iip.getLatestSnapshotId();
+    List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode,
+      snapshotId);
+    List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
+      aclSpec);
+    AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
+    return newAcl;
   }
 
   AclStatus getAclStatus(String src) throws IOException {
     readLock();
     try {
-      final INodeWithAdditionalFields node = resolveINodeWithAdditionalField(src);
-      AclFeature f = node.getAclFeature();
-
-      AclStatus.Builder builder = new AclStatus.Builder()
-          .owner(node.getUserName()).group(node.getGroupName())
-          .stickyBit(node.getFsPermission().getStickyBit());
-      if (f != null) {
-        builder.addEntries(f.getEntries());
-      }
-      return builder.build();
+      INodesInPath iip = rootDir.getINodesInPath4Write(normalizePath(src), true);
+      final INodeWithAdditionalFields inode = resolveINodeWithAdditionalFields(
+        src, iip);
+      int snapshotId = iip.getLatestSnapshotId();
+      List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
+      return new AclStatus.Builder()
+          .owner(inode.getUserName()).group(inode.getGroupName())
+          .stickyBit(inode.getFsPermission(snapshotId).getStickyBit())
+          .addEntries(acl).build();
     } finally {
       readUnlock();
     }
   }
 
-  private INodeWithAdditionalFields resolveINodeWithAdditionalField(String src)
-      throws UnresolvedLinkException, SnapshotAccessControlException,
-      FileNotFoundException {
-    String srcs = normalizePath(src);
-    final INodesInPath iip = rootDir.getINodesInPath4Write(srcs, true);
+  private static INodeWithAdditionalFields resolveINodeWithAdditionalFields(
+      String src, INodesInPath iip) throws FileNotFoundException {
     INode inode = iip.getLastINode();
     if (!(inode instanceof INodeWithAdditionalFields))
       throw new FileNotFoundException("cannot find " + src);
-
-    final INodeWithAdditionalFields node = (INodeWithAdditionalFields) inode;
-    return node;
+    return (INodeWithAdditionalFields)inode;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1559267&r1=1559266&r2=1559267&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jan 17 22:05:47 2014
@@ -7326,36 +7326,88 @@ public class FSNamesystem implements Nam
     return results;
   }
 
-  void modifyAclEntries(String src, Iterable<AclEntry> aclSpec) {
-    throw new UnsupportedOperationException("Unimplemented");
+  void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
+      dir.modifyAclEntries(src, aclSpec);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "modifyAclEntries", src);
   }
 
-  void removeAclEntries(String src, Iterable<AclEntry> aclSpec) {
-    throw new UnsupportedOperationException("Unimplemented");
+  void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
+      dir.removeAclEntries(src, aclSpec);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "removeAclEntries", src);
   }
 
-  void removeDefaultAcl(String src) {
-    throw new UnsupportedOperationException("Unimplemented");
+  void removeDefaultAcl(String src) throws IOException {
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
+      dir.removeDefaultAcl(src);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    logAuditEvent(true, "removeDefaultAcl", src);
   }
 
   void removeAcl(String src) throws IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot remove acl on " + src);
+      checkNameNodeSafeMode("Cannot remove ACL on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
       dir.removeAcl(src);
     } finally {
       writeUnlock();
     }
+    getEditLog().logSync();
+    logAuditEvent(true, "removeAcl", src);
   }
 
   void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot set acl on " + src);
+      checkNameNodeSafeMode("Cannot set ACL on " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      checkOwner(pc, src);
       dir.setAcl(src, aclSpec);
     } finally {
       writeUnlock();

Added: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ScopedAclEntries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ScopedAclEntries.java?rev=1559267&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ScopedAclEntries.java (added)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ScopedAclEntries.java Fri Jan 17 22:05:47 2014
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+
+/**
+ * Groups a list of ACL entries into separate lists for access entries vs.
+ * default entries.
+ */
+@InterfaceAudience.Private
+final class ScopedAclEntries {
+  private static final int PIVOT_NOT_FOUND = -1;
+
+  private final List<AclEntry> accessEntries;
+  private final List<AclEntry> defaultEntries;
+
+  /**
+   * Creates a new ScopedAclEntries from the given list.  It is assumed that the
+   * list is already sorted such that all access entries precede all default
+   * entries.
+   *
+   * @param aclEntries List<AclEntry> to separate
+   */
+  public ScopedAclEntries(List<AclEntry> aclEntries) {
+    int pivot = calculatePivotOnDefaultEntries(aclEntries);
+    if (pivot != PIVOT_NOT_FOUND) {
+      accessEntries = pivot != 0 ? aclEntries.subList(0, pivot) :
+        Collections.<AclEntry>emptyList();
+      defaultEntries = aclEntries.subList(pivot, aclEntries.size());
+    } else {
+      accessEntries = !aclEntries.isEmpty() ? aclEntries : null;
+      defaultEntries = Collections.emptyList();
+    }
+  }
+
+  /**
+   * Returns access entries.
+   *
+   * @return List<AclEntry> containing just access entries, or an empty list if
+   *   there are no access entries
+   */
+  public List<AclEntry> getAccessEntries() {
+    return accessEntries;
+  }
+
+  /**
+   * Returns default entries.
+   *
+   * @return List<AclEntry> containing just default entries, or an empty list if
+   *   there are no default entries
+   */
+  public List<AclEntry> getDefaultEntries() {
+    return defaultEntries;
+  }
+
+  /**
+   * Returns the pivot point in the list between the access entries and the
+   * default entries.  This is the index of the first element in the list that is
+   * a default entry.
+   *
+   * @param aclBuilder ArrayList<AclEntry> containing entries to build
+   * @return int pivot point, or -1 if list contains no default entries
+   */
+  private static int calculatePivotOnDefaultEntries(List<AclEntry> aclBuilder) {
+    for (int i = 0; i < aclBuilder.size(); ++i) {
+      if (aclBuilder.get(i).getScope() == AclEntryScope.DEFAULT) {
+        return i;
+      }
+    }
+    return PIVOT_NOT_FOUND;
+  }
+}

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java?rev=1559267&r1=1559266&r2=1559267&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java Fri Jan 17 22:05:47 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@@ -328,12 +329,48 @@ public class TestSafeMode {
         fs.setTimes(file1, 0, 0);
       }});
 
+    runFsFun("modifyAclEntries while in SM", new FSRun() {
+      @Override
+      public void run(FileSystem fs) throws IOException {
+        fs.modifyAclEntries(file1, Lists.<AclEntry>newArrayList());
+      }});
+
+    runFsFun("removeAclEntries while in SM", new FSRun() {
+      @Override
+      public void run(FileSystem fs) throws IOException {
+        fs.removeAclEntries(file1, Lists.<AclEntry>newArrayList());
+      }});
+
+    runFsFun("removeDefaultAcl while in SM", new FSRun() {
+      @Override
+      public void run(FileSystem fs) throws IOException {
+        fs.removeDefaultAcl(file1);
+      }});
+
+    runFsFun("removeAcl while in SM", new FSRun() {
+      @Override
+      public void run(FileSystem fs) throws IOException {
+        fs.removeAcl(file1);
+      }});
+
+    runFsFun("setAcl while in SM", new FSRun() {
+      @Override
+      public void run(FileSystem fs) throws IOException {
+        fs.setAcl(file1, Lists.<AclEntry>newArrayList());
+      }});
+
     try {
       DFSTestUtil.readFile(fs, file1);
     } catch (IOException ioe) {
       fail("Set times failed while in SM");
     }
 
+    try {
+      fs.getAclStatus(file1);
+    } catch (IOException ioe) {
+      fail("getAclStatus failed while in SM");
+    }
+
     assertFalse("Could not leave SM",
         dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
   }

Added: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/AclTestHelpers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/AclTestHelpers.java?rev=1559267&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/AclTestHelpers.java (added)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/AclTestHelpers.java Fri Jan 17 22:05:47 2014
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+
+/**
+ * Helper methods useful for writing ACL tests.
+ */
+final class AclTestHelpers {
+
+  /**
+   * Create a new AclEntry with scope, type and permission (no name).
+   *
+   * @param scope AclEntryScope scope of the ACL entry
+   * @param type AclEntryType ACL entry type
+   * @param permission FsAction set of permissions in the ACL entry
+   * @return AclEntry new AclEntry
+   */
+  public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
+      FsAction permission) {
+    return new AclEntry.Builder()
+      .setScope(scope)
+      .setType(type)
+      .setPermission(permission)
+      .build();
+  }
+
+  /**
+   * Create a new AclEntry with scope, type, name and permission.
+   *
+   * @param scope AclEntryScope scope of the ACL entry
+   * @param type AclEntryType ACL entry type
+   * @param name String optional ACL entry name
+   * @param permission FsAction set of permissions in the ACL entry
+   * @return AclEntry new AclEntry
+   */
+  public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
+      String name, FsAction permission) {
+    return new AclEntry.Builder()
+      .setScope(scope)
+      .setType(type)
+      .setName(name)
+      .setPermission(permission)
+      .build();
+  }
+
+  /**
+   * Create a new AclEntry with scope, type and name (no permission).
+   *
+   * @param scope AclEntryScope scope of the ACL entry
+   * @param type AclEntryType ACL entry type
+   * @param name String optional ACL entry name
+   * @return AclEntry new AclEntry
+   */
+  public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
+      String name) {
+    return new AclEntry.Builder()
+      .setScope(scope)
+      .setType(type)
+      .setName(name)
+      .build();
+  }
+
+  /**
+   * Create a new AclEntry with scope and type (no name or permission).
+   *
+   * @param scope AclEntryScope scope of the ACL entry
+   * @param type AclEntryType ACL entry type
+   * @return AclEntry new AclEntry
+   */
+  public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type) {
+    return new AclEntry.Builder()
+      .setScope(scope)
+      .setType(type)
+      .build();
+  }
+}

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAclTransformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAclTransformation.java?rev=1559267&r1=1559266&r2=1559267&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAclTransformation.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAclTransformation.java Fri Jan 17 22:05:47 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import static org.apache.hadoop.fs.permission.AclEntryScope.*;
 import static org.apache.hadoop.fs.permission.AclEntryType.*;
 import static org.apache.hadoop.fs.permission.FsAction.*;
+import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.*;
 import static org.apache.hadoop.hdfs.server.namenode.AclTransformation.*;
 import static org.junit.Assert.*;
 
@@ -1204,39 +1205,4 @@ public class TestAclTransformation {
       aclEntry(ACCESS, MASK, ALL));
     replaceAclEntries(existing, aclSpec);
   }
-
-  private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
-      FsAction permission) {
-    return new AclEntry.Builder()
-      .setScope(scope)
-      .setType(type)
-      .setPermission(permission)
-      .build();
-  }
-
-  private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
-      String name, FsAction permission) {
-    return new AclEntry.Builder()
-      .setScope(scope)
-      .setType(type)
-      .setName(name)
-      .setPermission(permission)
-      .build();
-  }
-
-  private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
-      String name) {
-    return new AclEntry.Builder()
-      .setScope(scope)
-      .setType(type)
-      .setName(name)
-      .build();
-  }
-
-  private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type) {
-    return new AclEntry.Builder()
-      .setScope(scope)
-      .setType(type)
-      .build();
-  }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java?rev=1559267&r1=1559266&r2=1559267&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithAcl.java Fri Jan 17 22:05:47 2014
@@ -17,15 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.*;
+import static org.apache.hadoop.fs.permission.AclEntryScope.*;
+import static org.apache.hadoop.fs.permission.AclEntryType.*;
+import static org.apache.hadoop.fs.permission.FsAction.*;
+
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclEntryScope;
-import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@@ -59,9 +61,8 @@ public class TestFSImageWithAcl {
     fs.mkdirs(new Path("/23"));
 
     AclEntry e = new AclEntry.Builder().setName("foo")
-        .setPermission(FsAction.READ_EXECUTE).setScope(AclEntryScope.DEFAULT)
-        .setType(AclEntryType.OTHER).build();
-    fs.setAcl(p, Lists.newArrayList(e));
+        .setPermission(READ_EXECUTE).setScope(ACCESS).setType(USER).build();
+    fs.modifyAclEntries(p, Lists.newArrayList(e));
 
     if (persistNamespace) {
       fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
@@ -75,7 +76,24 @@ public class TestFSImageWithAcl {
     AclStatus s = cluster.getNamesystem().getAclStatus(p.toString());
     AclEntry[] returned = Lists.newArrayList(s.getEntries()).toArray(
         new AclEntry[0]);
-    Assert.assertArrayEquals(new AclEntry[] { e }, returned);
+    Assert.assertArrayEquals(new AclEntry[] {
+        aclEntry(ACCESS, USER, "foo", READ_EXECUTE),
+        aclEntry(ACCESS, GROUP, READ) }, returned);
+
+    fs.removeAcl(p);
+
+    if (persistNamespace) {
+      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      fs.saveNamespace();
+      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+    }
+
+    cluster.restartNameNode();
+    cluster.waitActive();
+
+    s = cluster.getNamesystem().getAclStatus(p.toString());
+    returned = Lists.newArrayList(s.getEntries()).toArray(new AclEntry[0]);
+    Assert.assertArrayEquals(new AclEntry[] { }, returned);
   }
 
   @Test

Added: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java?rev=1559267&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java (added)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java Fri Jan 17 22:05:47 2014
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.fs.permission.AclEntryScope.*;
+import static org.apache.hadoop.fs.permission.AclEntryType.*;
+import static org.apache.hadoop.fs.permission.FsAction.*;
+import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.*;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Unit tests covering FSPermissionChecker.  All tests in this suite have been
+ * cross-validated against Linux setfacl/getfacl to check for consistency of the
+ * HDFS implementation.
+ */
+public class TestFSPermissionChecker {
+  private static final long PREFERRED_BLOCK_SIZE = 128 * 1024 * 1024;
+  private static final short REPLICATION = 3;
+  private static final String SUPERGROUP = "supergroup";
+  private static final String SUPERUSER = "superuser";
+  private static final UserGroupInformation BRUCE =
+    UserGroupInformation.createUserForTesting("bruce", new String[] { });
+  private static final UserGroupInformation DIANA =
+    UserGroupInformation.createUserForTesting("diana", new String[] { "sales" });
+  private static final UserGroupInformation CLARK =
+    UserGroupInformation.createUserForTesting("clark", new String[] { "execs" });
+
+  private INodeDirectory inodeRoot;
+
+  @Before
+  public void setUp() {
+    PermissionStatus permStatus = PermissionStatus.createImmutable(SUPERUSER,
+      SUPERGROUP, FsPermission.createImmutable((short)0755));
+    inodeRoot = new INodeDirectory(INodeId.ROOT_INODE_ID,
+      INodeDirectory.ROOT_NAME, permStatus, 0L);
+  }
+
+  @Test
+  public void testAclOwner() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "execs",
+      (short)0640);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, GROUP, READ),
+      aclEntry(ACCESS, MASK, READ),
+      aclEntry(ACCESS, OTHER, NONE));
+    assertPermissionGranted(BRUCE, "/file1", READ);
+    assertPermissionGranted(BRUCE, "/file1", WRITE);
+    assertPermissionGranted(BRUCE, "/file1", READ_WRITE);
+    assertPermissionDenied(BRUCE, "/file1", EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", READ);
+    assertPermissionDenied(DIANA, "/file1", WRITE);
+    assertPermissionDenied(DIANA, "/file1", EXECUTE);
+  }
+
+  @Test
+  public void testAclNamedUser() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "execs",
+      (short)0640);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, USER, "diana", READ),
+      aclEntry(ACCESS, GROUP, READ),
+      aclEntry(ACCESS, MASK, READ),
+      aclEntry(ACCESS, OTHER, NONE));
+    assertPermissionGranted(DIANA, "/file1", READ);
+    assertPermissionDenied(DIANA, "/file1", WRITE);
+    assertPermissionDenied(DIANA, "/file1", EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", READ_WRITE);
+    assertPermissionDenied(DIANA, "/file1", READ_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", WRITE_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", ALL);
+  }
+
+  @Test
+  public void testAclNamedUserDeny() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "execs",
+      (short)0644);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, USER, "diana", NONE),
+      aclEntry(ACCESS, GROUP, READ),
+      aclEntry(ACCESS, MASK, READ),
+      aclEntry(ACCESS, OTHER, READ));
+    assertPermissionGranted(BRUCE, "/file1", READ_WRITE);
+    assertPermissionGranted(CLARK, "/file1", READ);
+    assertPermissionDenied(DIANA, "/file1", READ);
+  }
+
+  @Test
+  public void testAclNamedUserTraverseDeny() throws IOException {
+    INodeDirectory inodeDir = createINodeDirectory(inodeRoot, "dir1", "bruce",
+      "execs", (short)0755);
+    INodeFile inodeFile = createINodeFile(inodeDir, "file1", "bruce", "execs",
+      (short)0644);
+    addAcl(inodeDir,
+      aclEntry(ACCESS, USER, ALL),
+      aclEntry(ACCESS, USER, "diana", NONE),
+      aclEntry(ACCESS, GROUP, READ_EXECUTE),
+      aclEntry(ACCESS, MASK, READ_EXECUTE),
+      aclEntry(ACCESS, OTHER, READ_EXECUTE));
+    assertPermissionGranted(BRUCE, "/dir1/file1", READ_WRITE);
+    assertPermissionGranted(CLARK, "/dir1/file1", READ);
+    assertPermissionDenied(DIANA, "/dir1/file1", READ);
+    assertPermissionDenied(DIANA, "/dir1/file1", WRITE);
+    assertPermissionDenied(DIANA, "/dir1/file1", EXECUTE);
+    assertPermissionDenied(DIANA, "/dir1/file1", READ_WRITE);
+    assertPermissionDenied(DIANA, "/dir1/file1", READ_EXECUTE);
+    assertPermissionDenied(DIANA, "/dir1/file1", WRITE_EXECUTE);
+    assertPermissionDenied(DIANA, "/dir1/file1", ALL);
+  }
+
+  @Test
+  public void testAclNamedUserMask() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "execs",
+      (short)0620);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, USER, "diana", READ),
+      aclEntry(ACCESS, GROUP, READ),
+      aclEntry(ACCESS, MASK, WRITE),
+      aclEntry(ACCESS, OTHER, NONE));
+    assertPermissionDenied(DIANA, "/file1", READ);
+    assertPermissionDenied(DIANA, "/file1", WRITE);
+    assertPermissionDenied(DIANA, "/file1", EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", READ_WRITE);
+    assertPermissionDenied(DIANA, "/file1", READ_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", WRITE_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", ALL);
+  }
+
+  @Test
+  public void testAclGroup() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "execs",
+      (short)0640);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, GROUP, READ),
+      aclEntry(ACCESS, MASK, READ),
+      aclEntry(ACCESS, OTHER, NONE));
+    assertPermissionGranted(CLARK, "/file1", READ);
+    assertPermissionDenied(CLARK, "/file1", WRITE);
+    assertPermissionDenied(CLARK, "/file1", EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", READ_WRITE);
+    assertPermissionDenied(CLARK, "/file1", READ_EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", WRITE_EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", ALL);
+  }
+
+  @Test
+  public void testAclGroupDeny() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "sales",
+      (short)0604);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, MASK, NONE),
+      aclEntry(ACCESS, OTHER, READ));
+    assertPermissionGranted(BRUCE, "/file1", READ_WRITE);
+    assertPermissionGranted(CLARK, "/file1", READ);
+    assertPermissionDenied(DIANA, "/file1", READ);
+    assertPermissionDenied(DIANA, "/file1", WRITE);
+    assertPermissionDenied(DIANA, "/file1", EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", READ_WRITE);
+    assertPermissionDenied(DIANA, "/file1", READ_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", WRITE_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", ALL);
+  }
+
+  @Test
+  public void testAclGroupTraverseDeny() throws IOException {
+    INodeDirectory inodeDir = createINodeDirectory(inodeRoot, "dir1", "bruce",
+      "execs", (short)0755);
+    INodeFile inodeFile = createINodeFile(inodeDir, "file1", "bruce", "execs",
+      (short)0644);
+    addAcl(inodeDir,
+      aclEntry(ACCESS, USER, ALL),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, MASK, NONE),
+      aclEntry(ACCESS, OTHER, READ_EXECUTE));
+    assertPermissionGranted(BRUCE, "/dir1/file1", READ_WRITE);
+    assertPermissionGranted(DIANA, "/dir1/file1", READ);
+    assertPermissionDenied(CLARK, "/dir1/file1", READ);
+    assertPermissionDenied(CLARK, "/dir1/file1", WRITE);
+    assertPermissionDenied(CLARK, "/dir1/file1", EXECUTE);
+    assertPermissionDenied(CLARK, "/dir1/file1", READ_WRITE);
+    assertPermissionDenied(CLARK, "/dir1/file1", READ_EXECUTE);
+    assertPermissionDenied(CLARK, "/dir1/file1", WRITE_EXECUTE);
+    assertPermissionDenied(CLARK, "/dir1/file1", ALL);
+  }
+
+  @Test
+  public void testAclGroupTraverseDenyOnlyDefaultEntries() throws IOException {
+    INodeDirectory inodeDir = createINodeDirectory(inodeRoot, "dir1", "bruce",
+      "execs", (short)0755);
+    INodeFile inodeFile = createINodeFile(inodeDir, "file1", "bruce", "execs",
+      (short)0644);
+    addAcl(inodeDir,
+      aclEntry(ACCESS, USER, ALL),
+      aclEntry(ACCESS, GROUP, NONE),
+      aclEntry(ACCESS, OTHER, READ_EXECUTE),
+      aclEntry(DEFAULT, USER, ALL),
+      aclEntry(DEFAULT, GROUP, "sales", NONE),
+      aclEntry(DEFAULT, GROUP, NONE),
+      aclEntry(DEFAULT, OTHER, READ_EXECUTE));
+    assertPermissionGranted(BRUCE, "/dir1/file1", READ_WRITE);
+    assertPermissionGranted(DIANA, "/dir1/file1", READ);
+    assertPermissionDenied(CLARK, "/dir1/file1", READ);
+    assertPermissionDenied(CLARK, "/dir1/file1", WRITE);
+    assertPermissionDenied(CLARK, "/dir1/file1", EXECUTE);
+    assertPermissionDenied(CLARK, "/dir1/file1", READ_WRITE);
+    assertPermissionDenied(CLARK, "/dir1/file1", READ_EXECUTE);
+    assertPermissionDenied(CLARK, "/dir1/file1", WRITE_EXECUTE);
+    assertPermissionDenied(CLARK, "/dir1/file1", ALL);
+  }
+
+  @Test
+  public void testAclGroupMask() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "execs",
+      (short)0644);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, GROUP, READ_WRITE),
+      aclEntry(ACCESS, MASK, READ),
+      aclEntry(ACCESS, OTHER, READ));
+    assertPermissionGranted(BRUCE, "/file1", READ_WRITE);
+    assertPermissionGranted(CLARK, "/file1", READ);
+    assertPermissionDenied(CLARK, "/file1", WRITE);
+    assertPermissionDenied(CLARK, "/file1", EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", READ_WRITE);
+    assertPermissionDenied(CLARK, "/file1", READ_EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", WRITE_EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", ALL);
+  }
+
+  @Test
+  public void testAclNamedGroup() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "execs",
+      (short)0640);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, GROUP, READ),
+      aclEntry(ACCESS, GROUP, "sales", READ),
+      aclEntry(ACCESS, MASK, READ),
+      aclEntry(ACCESS, OTHER, NONE));
+    assertPermissionGranted(BRUCE, "/file1", READ_WRITE);
+    assertPermissionGranted(CLARK, "/file1", READ);
+    assertPermissionGranted(DIANA, "/file1", READ);
+    assertPermissionDenied(DIANA, "/file1", WRITE);
+    assertPermissionDenied(DIANA, "/file1", EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", READ_WRITE);
+    assertPermissionDenied(DIANA, "/file1", READ_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", ALL);
+  }
+
+  @Test
+  public void testAclNamedGroupDeny() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "sales",
+      (short)0644);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, GROUP, READ),
+      aclEntry(ACCESS, GROUP, "execs", NONE),
+      aclEntry(ACCESS, MASK, READ),
+      aclEntry(ACCESS, OTHER, READ));
+    assertPermissionGranted(BRUCE, "/file1", READ_WRITE);
+    assertPermissionGranted(DIANA, "/file1", READ);
+    assertPermissionDenied(CLARK, "/file1", READ);
+    assertPermissionDenied(CLARK, "/file1", WRITE);
+    assertPermissionDenied(CLARK, "/file1", EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", READ_WRITE);
+    assertPermissionDenied(CLARK, "/file1", READ_EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", WRITE_EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", ALL);
+  }
+
+  @Test
+  public void testAclNamedGroupTraverseDeny() throws IOException {
+    INodeDirectory inodeDir = createINodeDirectory(inodeRoot, "dir1", "bruce",
+      "execs", (short)0755);
+    INodeFile inodeFile = createINodeFile(inodeDir, "file1", "bruce", "execs",
+      (short)0644);
+    addAcl(inodeDir,
+      aclEntry(ACCESS, USER, ALL),
+      aclEntry(ACCESS, GROUP, READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "sales", NONE),
+      aclEntry(ACCESS, MASK, READ_EXECUTE),
+      aclEntry(ACCESS, OTHER, READ_EXECUTE));
+    assertPermissionGranted(BRUCE, "/dir1/file1", READ_WRITE);
+    assertPermissionGranted(CLARK, "/dir1/file1", READ);
+    assertPermissionDenied(DIANA, "/dir1/file1", READ);
+    assertPermissionDenied(DIANA, "/dir1/file1", WRITE);
+    assertPermissionDenied(DIANA, "/dir1/file1", EXECUTE);
+    assertPermissionDenied(DIANA, "/dir1/file1", READ_WRITE);
+    assertPermissionDenied(DIANA, "/dir1/file1", READ_EXECUTE);
+    assertPermissionDenied(DIANA, "/dir1/file1", WRITE_EXECUTE);
+    assertPermissionDenied(DIANA, "/dir1/file1", ALL);
+  }
+
+  @Test
+  public void testAclNamedGroupMask() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "execs",
+      (short)0644);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, READ_WRITE),
+      aclEntry(ACCESS, GROUP, READ),
+      aclEntry(ACCESS, GROUP, "sales", READ_WRITE),
+      aclEntry(ACCESS, MASK, READ),
+      aclEntry(ACCESS, OTHER, READ));
+    assertPermissionGranted(BRUCE, "/file1", READ_WRITE);
+    assertPermissionGranted(CLARK, "/file1", READ);
+    assertPermissionGranted(DIANA, "/file1", READ);
+    assertPermissionDenied(DIANA, "/file1", WRITE);
+    assertPermissionDenied(DIANA, "/file1", EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", READ_WRITE);
+    assertPermissionDenied(DIANA, "/file1", READ_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", WRITE_EXECUTE);
+    assertPermissionDenied(DIANA, "/file1", ALL);
+  }
+
+  @Test
+  public void testAclOther() throws IOException {
+    INodeFile inodeFile = createINodeFile(inodeRoot, "file1", "bruce", "sales",
+      (short)0774);
+    addAcl(inodeFile,
+      aclEntry(ACCESS, USER, ALL),
+      aclEntry(ACCESS, USER, "diana", ALL),
+      aclEntry(ACCESS, GROUP, READ_WRITE),
+      aclEntry(ACCESS, MASK, ALL),
+      aclEntry(ACCESS, OTHER, READ));
+    assertPermissionGranted(BRUCE, "/file1", ALL);
+    assertPermissionGranted(DIANA, "/file1", ALL);
+    assertPermissionGranted(CLARK, "/file1", READ);
+    assertPermissionDenied(CLARK, "/file1", WRITE);
+    assertPermissionDenied(CLARK, "/file1", EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", READ_WRITE);
+    assertPermissionDenied(CLARK, "/file1", READ_EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", WRITE_EXECUTE);
+    assertPermissionDenied(CLARK, "/file1", ALL);
+  }
+
+  private void addAcl(INodeWithAdditionalFields inode, AclEntry... acl)
+      throws IOException {
+    AclStorage.updateINodeAcl((INodeWithAdditionalFields)inode,
+      Arrays.asList(acl), Snapshot.CURRENT_STATE_ID);
+  }
+
+  private void assertPermissionGranted(UserGroupInformation user, String path,
+      FsAction access) throws IOException {
+    new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(path,
+      inodeRoot, false, null, null, access, null, true);
+  }
+
+  private void assertPermissionDenied(UserGroupInformation user, String path,
+      FsAction access) throws IOException {
+    try {
+      new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(path,
+        inodeRoot, false, null, null, access, null, true);
+      fail("expected AccessControlException for user + " + user + ", path = " +
+        path + ", access = " + access);
+    } catch (AccessControlException e) {
+      // expected
+    }
+  }
+
+  private static INodeDirectory createINodeDirectory(INodeDirectory parent,
+      String name, String owner, String group, short perm) throws IOException {
+    PermissionStatus permStatus = PermissionStatus.createImmutable(owner, group,
+      FsPermission.createImmutable(perm));
+    INodeDirectory inodeDirectory = new INodeDirectory(
+      INodeId.GRANDFATHER_INODE_ID, name.getBytes("UTF-8"), permStatus, 0L);
+    parent.addChild(inodeDirectory);
+    return inodeDirectory;
+  }
+
+  private static INodeFile createINodeFile(INodeDirectory parent, String name,
+      String owner, String group, short perm) throws IOException {
+    PermissionStatus permStatus = PermissionStatus.createImmutable(owner, group,
+      FsPermission.createImmutable(perm));
+    INodeFile inodeFile = new INodeFile(INodeId.GRANDFATHER_INODE_ID,
+      name.getBytes("UTF-8"), permStatus, 0L, 0L, null, REPLICATION,
+      PREFERRED_BLOCK_SIZE);
+    parent.addChild(inodeFile);
+    return inodeFile;
+  }
+}



Mime
View raw message