hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r614776 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/security/ src/test/org/apache/hadoop/dfs/
Date Thu, 24 Jan 2008 02:55:58 GMT
Author: shv
Date: Wed Jan 23 18:55:54 2008
New Revision: 614776

URL: http://svn.apache.org/viewvc?rev=614776&view=rev
Log:
HADOOP-2431. Test HDFS File Permissions. Contributed by Hairong Kuang.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDFSPermission.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=614776&r1=614775&r2=614776&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jan 23 18:55:54 2008
@@ -285,6 +285,8 @@
     to specify the blocksize, replication factor and the buffersize to be
     used for the underlying HDFS file. (Alejandro Abdelnur via acmurthy) 
 
+    HADOOP-2431. Test HDFS File Permissions. (Hairong Kuang via shv)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=614776&r1=614775&r2=614776&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Jan 23 18:55:54
2008
@@ -1787,7 +1787,7 @@
   public DFSFileInfo[] getListing(String src) throws IOException {
     if (isPermissionEnabled) {
       if (dir.isDir(src)) {
-        checkPathAccess(src, FsAction.READ);
+        checkPathAccess(src, FsAction.READ_EXECUTE);
       }
       else {
         checkTraverse(src);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=614776&r1=614775&r2=614776&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Wed Jan 23 18:55:54 2008
@@ -28,7 +28,9 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.permission.AccessControlException;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RemoteException;
 
 /****************************************************************
  * An abstract base class for a fairly generic filesystem.  It
@@ -517,6 +519,10 @@
     try {
       return getFileStatus(f).isDir();
     } catch (IOException e) {
+      if (e instanceof RemoteException && AccessControlException.class
+          .getName().equals(((RemoteException)e).getClassName())) {
+        throw e;
+      }
       return false;               // f does not exist
     }
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java?rev=614776&r1=614775&r2=614776&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java Wed
Jan 23 18:55:54 2008
@@ -54,8 +54,7 @@
    * @param groupNames groups list, first of which is the default group
    * @exception IllegalArgumentException if any argument is null
    */
-  UnixUserGroupInformation(String userName, String[] groupNames)
-  throws IOException {
+  public UnixUserGroupInformation(String userName, String[] groupNames) {
     setUserGroupNames(userName, groupNames);
   }
 

Added: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDFSPermission.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDFSPermission.java?rev=614776&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDFSPermission.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDFSPermission.java Wed Jan 23 18:55:54
2008
@@ -0,0 +1,974 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.Random;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+/** Unit tests for permission */
+public class TestDFSPermission extends TestCase {
+  public static final Log LOG = LogFactory.getLog(TestDFSPermission.class);
+  final private static Configuration conf = new Configuration();
+  final private static String PERMISSION_EXCEPTION_NAME = 
+    AccessControlException.class.getName();
+  
+  final private static String GROUP1_NAME = "group1";
+  final private static String GROUP2_NAME = "group2";
+  final private static String GROUP3_NAME = "group3";
+  final private static String GROUP4_NAME = "group4";
+  final private static String USER1_NAME = "user1";
+  final private static String USER2_NAME = "user2";
+  final private static String USER3_NAME = "user3";
+
+  private static UnixUserGroupInformation SUPERUSER;
+  private static UnixUserGroupInformation USER1;
+  private static UnixUserGroupInformation USER2;
+  private static UnixUserGroupInformation USER3;
+  
+  final private static short MAX_PERMISSION = 511;
+  final private static short DEFAULT_UMASK = 022;
+  final private static short FILE_MASK = 0666;
+  final private static FsPermission DEFAULT_PERMISSION = 
+    FsPermission.createImmutable((short) 0777);
+  final static private int NUM_TEST_PERMISSIONS = 
+    conf.getInt("test.dfs.permission.num", 10) * (MAX_PERMISSION + 1) / 100;
+
+
+  final private static String PATH_NAME = "xx";
+  final private static Path FILE_DIR_PATH = new Path("/", PATH_NAME);
+  final private static Path NON_EXISTENT_PATH = new Path("/parent", PATH_NAME);
+  final private static Path NON_EXISTENT_FILE = new Path("/NonExistentFile");
+
+  private FileSystem fs;
+  private static Random r;
+
+  static {
+    try {
+      // Initiate the random number generator and logging the seed
+      long seed = FSNamesystem.now();
+      r = new Random(seed);
+      LOG.info("Random number generator uses seed " + seed);
+      LOG.info("NUM_TEST_PERMISSIONS=" + NUM_TEST_PERMISSIONS);
+      
+      // explicitly turn on permission checking
+      conf.setBoolean("dfs.permissions", true);
+      
+      // Initiate all four users
+      SUPERUSER = UnixUserGroupInformation.login(conf);
+      USER1 = new UnixUserGroupInformation(USER1_NAME, new String[] {
+          GROUP1_NAME, GROUP2_NAME });
+      USER2 = new UnixUserGroupInformation(USER2_NAME, new String[] {
+          GROUP2_NAME, GROUP3_NAME });
+      USER3 = new UnixUserGroupInformation(USER3_NAME, new String[] {
+          GROUP3_NAME, GROUP4_NAME });
+    } catch (LoginException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** This tests if permission setting in create, mkdir, and 
+   * setPermission works correctly
+   */
+  public void testPermissionSetting() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    try {
+      cluster.waitActive();
+      fs = FileSystem.get(conf);
+      LOG.info("ROOT=" + fs.getFileStatus(new Path("/")));
+      testPermissionSetting(OpType.CREATE); // test file creation
+      testPermissionSetting(OpType.MKDIRS); // test directory creation
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /* check permission setting works correctly for file or directory */
+  private void testPermissionSetting(OpType op) throws Exception {
+    // case 1: use default permission but all possible umasks
+    PermissionGenerator generator = new PermissionGenerator(r);
+    for (short i = 0; i < NUM_TEST_PERMISSIONS; i++) {
+      createAndCheckPermission(op, FILE_DIR_PATH, generator.next(),
+          new FsPermission(DEFAULT_PERMISSION), true);
+    }
+
+    // case 2: use permission 0643 and the default umask
+    createAndCheckPermission(op, FILE_DIR_PATH, DEFAULT_UMASK,
+        new FsPermission((short) 0643), true);
+
+    // case 3: use permission 0643 and umask 0222
+    createAndCheckPermission(op, FILE_DIR_PATH, (short) 0222, 
+        new FsPermission((short) 0643), false);
+
+    // case 4: set permission
+    fs.setPermission(FILE_DIR_PATH, new FsPermission((short) 0111));
+    short expectedPermission = (short) ((op == OpType.CREATE) ? 0 : 0111);
+    checkPermission(FILE_DIR_PATH, expectedPermission, true);
+
+    // case 5: test non-existent parent directory
+    assertFalse(fs.exists(NON_EXISTENT_PATH));
+    createAndCheckPermission(op, NON_EXISTENT_PATH, DEFAULT_UMASK,
+        new FsPermission(DEFAULT_PERMISSION), false);
+    Path parent = NON_EXISTENT_PATH.getParent();
+    checkPermission(parent, getPermission(parent.getParent()), true);
+  }
+
+  /* get the permission of a file/directory */
+  private short getPermission(Path path) throws IOException {
+    return fs.getFileStatus(path).getPermission().toShort();
+  }
+
+  /* create a file/directory with the default umask and permission */
+  private void create(OpType op, Path name) throws IOException {
+    create(op, name, DEFAULT_UMASK, new FsPermission(DEFAULT_PERMISSION));
+  }
+
+  /* create a file/directory with the given umask and permission */
+  private void create(OpType op, Path name, short umask, 
+      FsPermission permission) throws IOException {
+    // set umask in configuration
+    conf.setInt(FsPermission.UMASK_LABEL, umask);
+
+    // create the file/directory
+    switch (op) {
+    case CREATE:
+      FSDataOutputStream out = fs.create(name, permission, true, conf.getInt(
+          "io.file.buffer.size", 4096), fs.getDefaultReplication(), fs
+          .getDefaultBlockSize(), null);
+      out.close();
+      break;
+    case MKDIRS:
+      fs.mkdirs(name, permission);
+      break;
+    default:
+      throw new IOException("Unsupported operation: " + op);
+    }
+  }
+
+  /* create file/directory with the provided umask and permission; then it
+   * checks if the permission is set correctly;
+   * If the delete flag is true, delete the file afterwards; otherwise leave
+   * it in the file system.
+   */
+  private void createAndCheckPermission(OpType op, Path name, short umask,
+      FsPermission permission, boolean delete) throws Exception {
+    // create the file/directory
+    create(op, name, umask, permission);
+
+    // get the short form of the permission
+    short permissionNum = (DEFAULT_PERMISSION.equals(permission)) ? MAX_PERMISSION
+        : permission.toShort();
+
+    // get the expected permission
+    short expectedPermission = (op == OpType.CREATE) ? (short) (~umask
+        & permissionNum & FILE_MASK) : (short) (~umask & permissionNum);
+
+    // check if permission is correctly set
+    checkPermission(name, expectedPermission, delete);
+  }
+
+  /* Check if the permission of a file/directory is the same as the
+   * expected permission; If the delete flag is true, delete the
+   * file/directory afterwards.
+   */
+  private void checkPermission(Path name, short expectedPermission,
+      boolean delete) throws IOException {
+    try {
+      // check its permission
+      assertEquals(getPermission(name), expectedPermission);
+    } finally {
+      // delete the file
+      if (delete) {
+        fs.delete(name);
+      }
+    }
+  }
+
+  /* check if the ownership of a file/directory is set correctly */
+  public void testOwnership() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    try {
+      cluster.waitActive();
+      testOwnership(OpType.CREATE); // test file creation
+      testOwnership(OpType.MKDIRS); // test directory creation
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /* change a file/directory's owner and group.
+   * if expectDeny is set, expect an AccessControlException.
+   */
+  private void setOwner(Path path, String owner, String group,
+      boolean expectDeny) throws IOException {
+    try {
+      String expectedOwner = (owner == null) ? getOwner(path) : owner;
+      String expectedGroup = (group == null) ? getGroup(path) : group;
+      fs.setOwner(path, owner, group);
+      checkOwnership(path, expectedOwner, expectedGroup);
+      assertFalse(expectDeny);
+    } catch (RemoteException e) {
+      if (PERMISSION_EXCEPTION_NAME.equals(e.getClassName())) {
+        assertTrue(expectDeny);
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /* check ownership is set correctly for a file or directory */
+  private void testOwnership(OpType op) throws Exception {
+    // case 1: superuser create a file/directory
+    fs = FileSystem.get(conf);
+    create(op, FILE_DIR_PATH, DEFAULT_UMASK,
+        new FsPermission(DEFAULT_PERMISSION));
+    checkOwnership(FILE_DIR_PATH, SUPERUSER.getUserName(),
+        getGroup(FILE_DIR_PATH.getParent()));
+
+    // case 2: superuser changes FILE_DIR_PATH's owner to be <user1, group3>
+    setOwner(FILE_DIR_PATH, USER1.getUserName(), GROUP3_NAME, false);
+
+    // case 3: user1 changes FILE_DIR_PATH's owner to be user2
+    login(USER1);
+    setOwner(FILE_DIR_PATH, USER2.getUserName(), null, true);
+
+    // case 4: user1 changes FILE_DIR_PATH's group to be group1 which it belongs
+    // to
+    setOwner(FILE_DIR_PATH, null, GROUP1_NAME, false);
+
+    // case 5: user1 changes FILE_DIR_PATH's group to be group3
+    // which it does not belong to
+    setOwner(FILE_DIR_PATH, null, GROUP3_NAME, true);
+
+    // case 6: user2 (non-owner) changes FILE_DIR_PATH's group to be group3
+    login(USER2);
+    setOwner(FILE_DIR_PATH, null, GROUP3_NAME, true);
+
+    // case 7: user2 (non-owner) changes FILE_DIR_PATH's user to be user2
+    setOwner(FILE_DIR_PATH, USER2.getUserName(), null, true);
+
+    // delete the file/directory
+    login(SUPERUSER);
+    fs.delete(FILE_DIR_PATH);
+  }
+
+  /* Return the group owner of the file/directory */
+  private String getGroup(Path path) throws IOException {
+    return fs.getFileStatus(path).getGroup();
+  }
+
+  /* Return the file owner of the file/directory */
+  private String getOwner(Path path) throws IOException {
+    return fs.getFileStatus(path).getOwner();
+  }
+
+  /* check if ownership is set correctly */
+  private void checkOwnership(Path name, String expectedOwner,
+      String expectedGroup) throws IOException {
+    // check its owner and group
+    FileStatus status = fs.getFileStatus(name);
+    assertEquals(status.getOwner(), expectedOwner);
+    assertEquals(status.getGroup(), expectedGroup);
+  }
+
+  final static private String ANCESTOR_NAME = "/ancestor";
+  final static private String PARENT_NAME = "parent";
+  final static private String FILE_NAME = "file";
+  final static private String DIR_NAME = "dir";
+  final static private String FILE_DIR_NAME = "filedir";
+
+  private enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
+    GET_FILEINFO, IS_DIR, EXISTS, GET_CONTENT_LENGTH, LIST, RENAME, DELETE
+  };
+
+  /* Check if namenode performs permission checking correctly for
+   * superuser, file owner, group owner, and other users */
+  public void testPermissionChecking() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    try {
+      cluster.waitActive();
+      fs = FileSystem.get(conf);
+
+      // set the permission of the root to be world-wide rwx
+      fs.setPermission(new Path("/"), new FsPermission((short)0777));
+      
+      // create a directory hierarchy and sets random permission for each inode
+      PermissionGenerator ancestorPermissionGenerator = 
+        new PermissionGenerator(r);
+      PermissionGenerator dirPermissionGenerator = new PermissionGenerator(r);
+      PermissionGenerator filePermissionGenerator = new PermissionGenerator(r);
+      short[] ancestorPermissions = new short[NUM_TEST_PERMISSIONS];
+      short[] parentPermissions = new short[NUM_TEST_PERMISSIONS];
+      short[] permissions = new short[NUM_TEST_PERMISSIONS];
+      Path[] ancestorPaths = new Path[NUM_TEST_PERMISSIONS];
+      Path[] parentPaths = new Path[NUM_TEST_PERMISSIONS];
+      Path[] filePaths = new Path[NUM_TEST_PERMISSIONS];
+      Path[] dirPaths = new Path[NUM_TEST_PERMISSIONS];
+      for (int i = 0; i < NUM_TEST_PERMISSIONS; i++) {
+        // create ancestor directory
+        ancestorPaths[i] = new Path(ANCESTOR_NAME + i);
+        create(OpType.MKDIRS, ancestorPaths[i]);
+        fs.setOwner(ancestorPaths[i], USER1_NAME, GROUP2_NAME);
+        // create parent directory
+        parentPaths[i] = new Path(ancestorPaths[i], PARENT_NAME + i);
+        create(OpType.MKDIRS, parentPaths[i]);
+        // change parent directory's ownership to be user1
+        fs.setOwner(parentPaths[i], USER1_NAME, GROUP2_NAME);
+
+        filePaths[i] = new Path(parentPaths[i], FILE_NAME + i);
+        dirPaths[i] = new Path(parentPaths[i], DIR_NAME + i);
+
+        // makes sure that each inode at the same level 
+        // has a different permission
+        ancestorPermissions[i] = ancestorPermissionGenerator.next();
+        parentPermissions[i] = dirPermissionGenerator.next();
+        permissions[i] = filePermissionGenerator.next();
+        fs.setPermission(ancestorPaths[i], new FsPermission(
+            ancestorPermissions[i]));
+        fs.setPermission(parentPaths[i], new FsPermission(
+                parentPermissions[i]));
+      }
+
+      /* file owner */
+      testPermissionCheckingPerUser(USER1, ancestorPermissions,
+          parentPermissions, permissions, parentPaths, filePaths, dirPaths);
+      /* group owner */
+      testPermissionCheckingPerUser(USER2, ancestorPermissions,
+          parentPermissions, permissions, parentPaths, filePaths, dirPaths);
+      /* other owner */
+      testPermissionCheckingPerUser(USER3, ancestorPermissions,
+          parentPermissions, permissions, parentPaths, filePaths, dirPaths);
+      /* super owner */
+      testPermissionCheckingPerUser(SUPERUSER, ancestorPermissions,
+          parentPermissions, permissions, parentPaths, filePaths, dirPaths);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /* Check if namenode performs permission checking correctly 
+   * for the given user for operations mkdir, open, setReplication, 
+   * getFileInfo, isDirectory, exists, getContentLength, list, rename,
+   * and delete */
+  private void testPermissionCheckingPerUser(UnixUserGroupInformation ugi,
+      short[] ancestorPermission, short[] parentPermission,
+      short[] filePermission, Path[] parentDirs, Path[] files, Path[] dirs)
+      throws Exception {
+    login(SUPERUSER);
+    for (int i = 0; i < NUM_TEST_PERMISSIONS; i++) {
+      create(OpType.CREATE, files[i]);
+      create(OpType.MKDIRS, dirs[i]);
+      fs.setOwner(files[i], USER1_NAME, GROUP2_NAME);
+      fs.setOwner(dirs[i], USER1_NAME, GROUP2_NAME);
+      checkOwnership(dirs[i], USER1_NAME, GROUP2_NAME);
+      checkOwnership(files[i], USER1_NAME, GROUP2_NAME);
+
+      FsPermission fsPermission = new FsPermission(filePermission[i]);
+      fs.setPermission(files[i], fsPermission);
+      fs.setPermission(dirs[i], fsPermission);
+    }
+
+    login(ugi);
+    for (int i = 0; i < NUM_TEST_PERMISSIONS; i++) {
+      testCreateMkdirs(ugi, new Path(parentDirs[i], FILE_DIR_NAME),
+          ancestorPermission[i], parentPermission[i]);
+      testOpen(ugi, files[i], ancestorPermission[i], parentPermission[i],
+          filePermission[i]);
+      testSetReplication(ugi, files[i], ancestorPermission[i],
+          parentPermission[i], filePermission[i]);
+      testStats(ugi, files[i], ancestorPermission[i], parentPermission[i]);
+      testList(ugi, files[i], dirs[i], ancestorPermission[i],
+          parentPermission[i], filePermission[i]);
+      int next = i == NUM_TEST_PERMISSIONS - 1 ? 0 : i + 1;
+      testRename(ugi, files[i], files[next], ancestorPermission[i],
+          parentPermission[i], ancestorPermission[next], parentPermission[next]);
+      testDeleteFile(ugi, files[i], ancestorPermission[i], parentPermission[i]);
+      testDeleteDir(ugi, dirs[i], ancestorPermission[i], parentPermission[i],
+          filePermission[i], null);
+    }
+    
+    // test non existent file
+    checkNonExistentFile();
+  }
+
+  /* A random permission generator that guarantees that each permission
+   * value is generated only once.
+   */
+  static private class PermissionGenerator {
+    private Random r;
+    private short permissions[] = new short[MAX_PERMISSION + 1];
+    private int numLeft = MAX_PERMISSION + 1;
+
+    PermissionGenerator(Random r) {
+      this.r = r;
+      for (int i = 0; i <= MAX_PERMISSION; i++) {
+        permissions[i] = (short) i;
+      }
+    }
+
+    short next() throws IOException {
+      if (numLeft == 0) {
+        throw new IOException("No more permission is avaialbe");
+      }
+      int index = r.nextInt(numLeft); // choose which permission to return
+      numLeft--; // decrement the counter
+
+      // swap the chosen permission with last available permission in the array
+      short temp = permissions[numLeft];
+      permissions[numLeft] = permissions[index];
+      permissions[index] = temp;
+
+      return permissions[numLeft];
+    }
+  }
+
+  /* A base class that verifies the permission checking is correct 
+   * for an operation */
+  abstract class PermissionVerifier {
+    protected Path path;
+    protected short ancestorPermission;
+    protected short parentPermission;
+    private short permission;
+    protected short requiredAncestorPermission;
+    protected short requiredParentPermission;
+    protected short requiredPermission;
+    final static protected short opAncestorPermission = SEARCH_MASK;
+    protected short opParentPermission;
+    protected short opPermission;
+    protected UnixUserGroupInformation ugi;
+
+    /* initialize */
+    protected void set(Path path, short ancestorPermission,
+        short parentPermission, short permission) {
+      this.path = path;
+      this.ancestorPermission = ancestorPermission;
+      this.parentPermission = parentPermission;
+      this.permission = permission;
+      setOpPermission();
+      this.ugi = null;
+    }
+
+    /* Perform an operation and verify if the permission checking is correct */
+    void verifyPermission(UnixUserGroupInformation ugi) throws LoginException,
+        IOException {
+      if (this.ugi != ugi) {
+        setRequiredPermissions(ugi);
+        this.ugi = ugi;
+      }
+
+      try {
+        try {
+          call();
+          assertFalse(expectPermissionDeny());
+        } catch (RemoteException e) {
+          if (PERMISSION_EXCEPTION_NAME.equals(e.getClassName())) {
+            assertTrue(expectPermissionDeny());
+          } else {
+            throw e; // re-throw
+          }
+        }
+      } catch (AssertionFailedError ae) {
+        logPermissions();
+        throw ae;
+      }
+    }
+
+    /* Log the the permissions and required permissions */
+    protected void logPermissions() {
+      LOG.info("required ancestor permission:"
+          + Integer.toOctalString(requiredAncestorPermission));
+      LOG.info("ancestor permission: "
+          + Integer.toOctalString(ancestorPermission));
+      LOG.info("required parent permission:"
+          + Integer.toOctalString(requiredParentPermission));
+      LOG.info("parent permission: " + Integer.toOctalString(parentPermission));
+      LOG.info("required permission:"
+          + Integer.toOctalString(requiredPermission));
+      LOG.info("permission: " + Integer.toOctalString(permission));
+    }
+
+    /* Return true if an AccessControlException is expected */
+    protected boolean expectPermissionDeny() {
+      return (requiredPermission & permission) != requiredPermission
+          || (requiredParentPermission & parentPermission) !=
+                            requiredParentPermission
+          || (requiredAncestorPermission & ancestorPermission) !=
+                            requiredAncestorPermission;
+    }
+
+    /* Set the permissions required to pass the permission checking */
+    protected void setRequiredPermissions(UnixUserGroupInformation ugi)
+        throws IOException {
+      if (SUPERUSER.equals(ugi)) {
+        requiredAncestorPermission = SUPER_MASK;
+        requiredParentPermission = SUPER_MASK;
+        requiredPermission = SUPER_MASK;
+      } else if (USER1.equals(ugi)) {
+        requiredAncestorPermission = (short)(opAncestorPermission & OWNER_MASK);
+        requiredParentPermission = (short)(opParentPermission & OWNER_MASK);
+        requiredPermission = (short)(opPermission & OWNER_MASK);
+      } else if (USER2.equals(ugi)) {
+        requiredAncestorPermission = (short)(opAncestorPermission & GROUP_MASK);
+        requiredParentPermission = (short)(opParentPermission & GROUP_MASK);
+        requiredPermission = (short)(opPermission & GROUP_MASK);
+      } else if (USER3.equals(ugi)) {
+        requiredAncestorPermission = (short)(opAncestorPermission & OTHER_MASK);
+        requiredParentPermission = (short)(opParentPermission & OTHER_MASK);
+        requiredPermission = (short)(opPermission & OTHER_MASK);
+      } else {
+        throw new IllegalArgumentException("Non-supported user: " + ugi);
+      }
+    }
+
+    /* Set the rwx permissions required for the operation */
+    abstract void setOpPermission();
+
+    /* Perform the operation */
+    abstract void call() throws IOException;
+  }
+
+  final static private short SUPER_MASK = 0;
+  final static private short READ_MASK = 0444;
+  final static private short WRITE_MASK = 0222;
+  final static private short SEARCH_MASK = 0111;
+  final static private short NULL_MASK = 0;
+  final static private short OWNER_MASK = 0700;
+  final static private short GROUP_MASK = 0070;
+  final static private short OTHER_MASK = 0007;
+
+  /* A class that verifies the permission checking is correct for create/mkdir*/
+  private class CreatePermissionVerifier extends PermissionVerifier {
+    private OpType opType;
+    private boolean cleanup = true;
+
+    /* initialize */
+    protected void set(Path path, OpType opType, short ancestorPermission,
+        short parentPermission) {
+      super.set(path, ancestorPermission, parentPermission, NULL_MASK);
+      setOpType(opType);
+    }
+
+    void setCleanup(boolean cleanup) {
+      this.cleanup = cleanup;
+    }
+    
+    /* set if the operation mkdir/create */
+    void setOpType(OpType opType) {
+      this.opType = opType;
+    }
+
+    @Override
+    void setOpPermission() {
+      this.opParentPermission = SEARCH_MASK | WRITE_MASK;
+    }
+
+    @Override
+    void call() throws IOException {
+      create(opType, path);
+      if (cleanup) {
+        fs.delete(path);
+      }
+    }
+  }
+
+  private CreatePermissionVerifier createVerifier =
+    new CreatePermissionVerifier();
+  /* test if the permission checking of create/mkdir is correct */
+  private void testCreateMkdirs(UnixUserGroupInformation ugi, Path path,
+      short ancestorPermission, short parentPermission) throws Exception {
+    createVerifier.set(path, OpType.MKDIRS, ancestorPermission,
+        parentPermission);
+    createVerifier.verifyPermission(ugi);
+    createVerifier.setOpType(OpType.CREATE);
+    createVerifier.setCleanup(false);
+    createVerifier.verifyPermission(ugi);
+    createVerifier.setCleanup(true);
+    createVerifier.verifyPermission(ugi); // test overWritten
+  }
+
+  /* A class that verifies the permission checking is correct for open */
+  private class OpenPermissionVerifier extends PermissionVerifier {
+    @Override
+    void setOpPermission() {
+      this.opParentPermission = SEARCH_MASK;
+      this.opPermission = READ_MASK;
+    }
+
+    @Override
+    void call() throws IOException {
+      FSDataInputStream in = fs.open(path);
+      in.close();
+    }
+  }
+
+  private OpenPermissionVerifier openVerifier = new OpenPermissionVerifier();
+  /* test if the permission checking of open is correct */
+  private void testOpen(UnixUserGroupInformation ugi, Path path,
+      short ancestorPermission, short parentPermission, short filePermission)
+      throws Exception {
+    openVerifier
+        .set(path, ancestorPermission, parentPermission, filePermission);
+    openVerifier.verifyPermission(ugi);
+  }
+
+  /* A class that verifies the permission checking is correct for 
+   * setReplication */
+  private class SetReplicationPermissionVerifier extends PermissionVerifier {
+    @Override
+    void setOpPermission() {
+      this.opParentPermission = SEARCH_MASK;
+      this.opPermission = WRITE_MASK;
+    }
+
+    @Override
+    void call() throws IOException {
+      fs.setReplication(path, (short) 1);
+    }
+  }
+
+  private SetReplicationPermissionVerifier replicatorVerifier =
+    new SetReplicationPermissionVerifier();
+  /* test if the permission checking of setReplication is correct */
+  private void testSetReplication(UnixUserGroupInformation ugi, Path path,
+      short ancestorPermission, short parentPermission, short filePermission)
+      throws Exception {
+    replicatorVerifier.set(path, ancestorPermission, parentPermission,
+        filePermission);
+    replicatorVerifier.verifyPermission(ugi);
+  }
+
+  /* A class that verifies the permission checking is correct for isDirectory,
+   * exist,  getFileInfo, getContentLength */
+  private class StatsPermissionVerifier extends PermissionVerifier {
+    OpType opType;
+
+    /* initialize */
+    void set(Path path, OpType opType, short ancestorPermission,
+        short parentPermission) {
+      super.set(path, ancestorPermission, parentPermission, NULL_MASK);
+      setOpType(opType);
+    }
+
+    /* set if operation is getFileInfo, isDirectory, exist, getContenLength */
+    void setOpType(OpType opType) {
+      this.opType = opType;
+    }
+
+    @Override
+    void setOpPermission() {
+      this.opParentPermission = SEARCH_MASK;
+    }
+
+    @Override
+    void call() throws IOException {
+      switch (opType) {
+      case GET_FILEINFO:
+        fs.getFileStatus(path);
+        break;
+      case IS_DIR:
+        fs.isDirectory(path);
+        break;
+      case EXISTS:
+        fs.exists(path);
+        break;
+      case GET_CONTENT_LENGTH:
+        fs.getContentLength(path);
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected operation type: "
+            + opType);
+      }
+    }
+  }
+
+  private StatsPermissionVerifier statsVerifier = new StatsPermissionVerifier();
+  /* test if the permission checking of isDirectory, exist,
+   * getFileInfo, getContentLength is correct */
+  private void testStats(UnixUserGroupInformation ugi, Path path,
+      short ancestorPermission, short parentPermission) throws Exception {
+    statsVerifier.set(path, OpType.GET_FILEINFO, ancestorPermission,
+        parentPermission);
+    statsVerifier.verifyPermission(ugi);
+    statsVerifier.setOpType(OpType.IS_DIR);
+    statsVerifier.verifyPermission(ugi);
+    statsVerifier.setOpType(OpType.EXISTS);
+    statsVerifier.verifyPermission(ugi);
+    statsVerifier.setOpType(OpType.GET_CONTENT_LENGTH);
+    statsVerifier.verifyPermission(ugi);
+  }
+
+  private enum InodeType {
+    FILE, DIR
+  };
+
+  /* A class that verifies the permission checking is correct for list */
+  private class ListPermissionVerifier extends PermissionVerifier {
+    private InodeType inodeType;
+
+    /* initialize */
+    void set(Path path, InodeType inodeType, short ancestorPermission,
+        short parentPermission, short permission) {
+      this.inodeType = inodeType;
+      super.set(path, ancestorPermission, parentPermission, permission);
+    }
+
+    /* set if the given path is a file/directory */
+    void setInodeType(Path path, InodeType inodeType) {
+      this.path = path;
+      this.inodeType = inodeType;
+      setOpPermission();
+      this.ugi = null;
+    }
+
+    @Override
+    void setOpPermission() {
+      this.opParentPermission = SEARCH_MASK;
+      switch (inodeType) {
+      case FILE:
+        this.opPermission = 0;
+        break;
+      case DIR:
+        this.opPermission = READ_MASK | SEARCH_MASK;
+        break;
+      default:
+        throw new IllegalArgumentException("Illegal inode type: " + inodeType);
+      }
+    }
+
+    @Override
+    void call() throws IOException {
+      fs.listStatus(path);
+    }
+  }
+
+  ListPermissionVerifier listVerifier = new ListPermissionVerifier();
+  /* test if the permission checking of list is correct */
+  private void testList(UnixUserGroupInformation ugi, Path file, Path dir,
+      short ancestorPermission, short parentPermission, short filePermission)
+      throws Exception {
+    listVerifier.set(file, InodeType.FILE, ancestorPermission,
+        parentPermission, filePermission);
+    listVerifier.verifyPermission(ugi);
+    listVerifier.setInodeType(dir, InodeType.DIR);
+    listVerifier.verifyPermission(ugi);
+  }
+
+  /* A class that verifies the permission checking is correct for rename */
+  private class RenamePermissionVerifier extends PermissionVerifier {
+    private Path dst;
+    private short dstAncestorPermission;
+    private short dstParentPermission;
+
+    /* initialize */
+    void set(Path src, short srcAncestorPermission, short srcParentPermission,
+        Path dst, short dstAncestorPermission, short dstParentPermission) {
+      super.set(src, srcAncestorPermission, srcParentPermission, NULL_MASK);
+      this.dst = dst;
+      this.dstAncestorPermission = dstAncestorPermission;
+      this.dstParentPermission = dstParentPermission;
+    }
+
+    @Override
+    void setOpPermission() {
+      opParentPermission = SEARCH_MASK | WRITE_MASK;
+    }
+
+    @Override
+    void call() throws IOException {
+      fs.rename(path, dst);
+    }
+
+    @Override
+    protected boolean expectPermissionDeny() {
+      return super.expectPermissionDeny()
+          || (requiredParentPermission & dstParentPermission) != 
+                requiredParentPermission
+          || (requiredAncestorPermission & dstAncestorPermission) != 
+                requiredAncestorPermission;
+    }
+
+    protected void logPermissions() {
+      super.logPermissions();
+      LOG.info("dst ancestor permission: "
+          + Integer.toOctalString(dstAncestorPermission));
+      LOG.info("dst parent permission: "
+          + Integer.toOctalString(dstParentPermission));
+    }
+  }
+
+  RenamePermissionVerifier renameVerifier = new RenamePermissionVerifier();
+  /* test if the permission checking of rename is correct */
+  private void testRename(UnixUserGroupInformation ugi, Path src, Path dst,
+      short srcAncestorPermission, short srcParentPermission,
+      short dstAncestorPermission, short dstParentPermission) throws Exception {
+    renameVerifier.set(src, srcAncestorPermission, srcParentPermission, dst,
+        dstAncestorPermission, dstParentPermission);
+    renameVerifier.verifyPermission(ugi);
+  }
+
+  /* A class that verifies the permission checking is correct for delete */
+  private class DeletePermissionVerifier extends PermissionVerifier {
+    void set(Path path, short ancestorPermission, short parentPermission) {
+      super.set(path, ancestorPermission, parentPermission, NULL_MASK);
+    }
+
+    @Override
+    void setOpPermission() {
+      this.opParentPermission = SEARCH_MASK | WRITE_MASK;
+    }
+
+    @Override
+    void call() throws IOException {
+      fs.delete(path);
+    }
+  }
+
+  /* A class that verifies the permission checking is correct for
+   * directory deletion */
+  private class DeleteDirPermissionVerifier extends DeletePermissionVerifier {
+    private short[] childPermissions;
+
+    /* initialize */
+    void set(Path path, short ancestorPermission, short parentPermission,
+        short permission, short[] childPermissions) {
+      set(path, ancestorPermission, parentPermission, permission);
+      this.childPermissions = childPermissions;
+    }
+
+    @Override
+    void setOpPermission() {
+      this.opParentPermission = SEARCH_MASK | WRITE_MASK;
+      this.opPermission = SEARCH_MASK | WRITE_MASK | READ_MASK;
+    }
+
+    @Override
+    protected boolean expectPermissionDeny() {
+      if (super.expectPermissionDeny()) {
+        return true;
+      } else {
+        if (childPermissions != null) {
+          for (short childPermission : childPermissions) {
+            if ((requiredPermission & childPermission) != requiredPermission) {
+              return true;
+            }
+          }
+        }
+        return false;
+      }
+    }
+  }
+
+  DeletePermissionVerifier fileDeletionVerifier =
+    new DeletePermissionVerifier();
+
+  /* test if the permission checking of file deletion is correct */
+  private void testDeleteFile(UnixUserGroupInformation ugi, Path file,
+      short ancestorPermission, short parentPermission) throws Exception {
+    fileDeletionVerifier.set(file, ancestorPermission, parentPermission);
+    fileDeletionVerifier.verifyPermission(ugi);
+  }
+
+  DeleteDirPermissionVerifier dirDeletionVerifier =
+    new DeleteDirPermissionVerifier();
+
+  /* test if the permission checking of directory deletion is correct */
+  private void testDeleteDir(UnixUserGroupInformation ugi, Path path,
+      short ancestorPermission, short parentPermission, short permission,
+      short[] childPermissions) throws Exception {
+    dirDeletionVerifier.set(path, ancestorPermission, parentPermission,
+        permission, childPermissions);
+    dirDeletionVerifier.verifyPermission(ugi);
+
+  }
+
+  /* log into dfs as the given user */
+  private void login(UnixUserGroupInformation ugi) throws IOException {
+    if (fs != null) {
+      fs.close();
+    }
+    UnixUserGroupInformation.saveToConf(conf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+    fs = FileSystem.get(conf); // login as ugi
+  }
+
+  /* test non-existent file */
+  private void checkNonExistentFile() {
+    try {
+      assertFalse(fs.exists(NON_EXISTENT_FILE));
+    } catch (IOException e) {
+      checkNoPermissionDeny(e);
+    }
+    try {
+      fs.open(NON_EXISTENT_FILE);
+    } catch (IOException e) {
+      checkNoPermissionDeny(e);
+    }
+    try {
+      fs.setReplication(NON_EXISTENT_FILE, (short)4);
+    } catch (IOException e) {
+      checkNoPermissionDeny(e);
+    }
+    try {
+      fs.getFileStatus(NON_EXISTENT_FILE);
+    } catch (IOException e) {
+      checkNoPermissionDeny(e);
+    }
+    try {      
+      fs.getContentLength(NON_EXISTENT_FILE);
+    } catch (IOException e) {
+      checkNoPermissionDeny(e);
+    }
+    try {
+      fs.listStatus(NON_EXISTENT_FILE);
+    } catch (IOException e) {
+      checkNoPermissionDeny(e);
+    }
+    try {
+      fs.delete(NON_EXISTENT_FILE);
+    } catch (IOException e) {
+      checkNoPermissionDeny(e);
+    }
+    try {
+      fs.rename(NON_EXISTENT_FILE, new Path(NON_EXISTENT_FILE+".txt"));
+    } catch (IOException e) {
+      checkNoPermissionDeny(e);
+    }
+  }
+  
+  private void checkNoPermissionDeny(IOException e) {
+    if (e instanceof RemoteException) {
+      assertFalse(PERMISSION_EXCEPTION_NAME.equals((
+          (RemoteException)e).getClassName()));
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message