hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r609029 [1/2] - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/permission/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/security/ src/test/org/apache/hadoop/dfs/ src/test/org/ap...
Date Fri, 04 Jan 2008 22:12:17 GMT
Author: cutting
Date: Fri Jan  4 14:12:15 2008
New Revision: 609029

URL: http://svn.apache.org/viewvc?rev=609029&view=rev
Log:
HADOOP-1298.  Implement permissions for HDFS, disabled by default.  Contributed by Nicholas and taton.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PermissionChecker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SerialNumberManager.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/security/TestPermission.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/FsPermission.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/PermissionStatus.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan  4 14:12:15 2008
@@ -56,6 +56,10 @@
 
     HADOOP-2336. Shell commands to modify file permissions. (rangadi)
 
+    HADOOP-1298. Implement file permissions for HDFS, disabled by
+    default.  Enable with dfs.permissions=true.
+    (Tsz Wo (Nicholas) & taton via cutting)
+
   IMPROVEMENTS
 
     HADOOP-2045.  Change committer list on website to a table, so that

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri Jan  4 14:12:15 2008
@@ -280,6 +280,24 @@
 </property>
 
 <property>
+  <name>dfs.permissions</name>
+  <value>false</value>
+  <description>
+    If "true", enable permission checking in HDFS.
+    If "false", permission checking is turned off,
+    but all other behavior is unchanged.
+    Switching from one parameter value to the other does not change the mode,
+    owner or group of files or directories.
+  </description>
+</property>
+
+<property>
+  <name>dfs.permissions.supergroup</name>
+  <value>supergroup</value>
+  <description>The name of the group of super-users.</description>
+</property>
+
+<property>
   <name>dfs.client.buffer.dir</name>
   <value>${hadoop.tmp.dir}/dfs/tmp</value>
   <description>Determines where on the local filesystem an DFS client

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java Fri Jan  4 14:12:15 2008
@@ -2074,7 +2074,7 @@
   }
   
   void updateBlockLevelStats(String path, BlockLevelStats stats) {
-    DFSFileInfo[] fileArr = getFSNamesystem().getListing(path);
+    DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path);
     
     for (DFSFileInfo file:fileArr) {
       if (file.isDir()) {
@@ -2083,7 +2083,7 @@
         // Get the all the blocks.
         LocatedBlocks blockLoc = null;
         try {
-          blockLoc = getFSNamesystem().getBlockLocations(null,
+          blockLoc = getFSNamesystem().getBlockLocationsInternal(null,
               file.getPath().toString(), 0, file.getLen());
           int numBlocks = blockLoc.locatedBlockCount();
           for (int i=0; i<numBlocks; i++) {
@@ -2142,7 +2142,7 @@
   
   private int deleteCrcFiles(String path) {
     // Recursively deletes files
-    DFSFileInfo[] fileArr = getFSNamesystem().getListing(path);
+    DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path);
     
     int numFilesDeleted = 0;
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Fri Jan  4 14:12:15 2008
@@ -20,6 +20,7 @@
 import java.io.*;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
+import org.apache.hadoop.fs.permission.*;
 
 /**********************************************************************
  * ClientProtocol is used by a piece of DFS user code to communicate 
@@ -38,8 +39,9 @@
    * 20 : getContentLength returns the total size in bytes of a directory subtree
    * 21 : add lease holder as a parameter in abandonBlock(...)
    * 22 : Serialization of FileStatus has changed.
+   * 23 : added setOwner(...) and setPermission(...); changed create(...) and mkdir(...)
    */
-  public static final long versionID = 22L;
+  public static final long versionID = 23L;
   
   ///////////////////////////////////////
   // File contents
@@ -95,8 +97,16 @@
    * Blocks have a maximum size.  Clients that intend to
    * create multi-block files must also use reportWrittenBlock()
    * and addBlock().
+   *
+   * If permission denied,
+   * an {@link AccessControlException} will be thrown as an
+   * {@link org.apache.hadoop.ipc.RemoteException}.
+   *
+   * @param src The path of the directory being created
+   * @param masked The masked permission
    */
   public void create(String src, 
+                     FsPermission masked,
                              String clientName, 
                              boolean overwrite, 
                              short replication,
@@ -122,6 +132,22 @@
                                 ) throws IOException;
 
   /**
+   * Set permissions for an existing file/directory.
+   */
+  public void setPermission(String src, FsPermission permission
+      ) throws IOException;
+
+  /**
+   * Set owner of a path (i.e. a file or a directory).
+   * The parameters username and groupname cannot both be null.
+   * @param src
+   * @param username If it is null, the original username remains unchanged.
+   * @param groupname If it is null, the original groupname remains unchanged.
+   */
+  public void setOwner(String src, String username, String groupname
+      ) throws IOException;
+
+  /**
    * If the client has not yet called reportWrittenBlock(), it can
    * give up on it by calling abandonBlock().  The client can then
    * either obtain a new block, or complete or abandon the file.
@@ -203,9 +229,17 @@
 
   /**
    * Create a directory (or hierarchy of directories) with the given
-   * name.
+   * name and permission.
+   *
+   * If permission denied,
+   * an {@link AccessControlException} will be thrown as an
+   * {@link org.apache.hadoop.ipc.RemoteException}.
+   *
+   * @param src The path of the directory being created
+   * @param masked The masked permission of the directory being created
+   * @return True if the operation success.
    */
-  public boolean mkdirs(String src) throws IOException;
+  public boolean mkdirs(String src, FsPermission masked) throws IOException;
 
   /**
    * Get a listing of the indicated directory

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Fri Jan  4 14:12:15 2008
@@ -22,6 +22,7 @@
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.conf.*;
@@ -333,17 +334,37 @@
         conf.getInt("io.file.buffer.size", 4096));
   }
   /**
+   * Call
+   * {@link #create(String,FsPermission,boolean,short,long,Progressable,int)}
+   * with default permission.
+   * @see FsPermission#getDefault(Configuration)
+   */
+  public OutputStream create(String src,
+      boolean overwrite,
+      short replication,
+      long blockSize,
+      Progressable progress,
+      int buffersize
+      ) throws IOException {
+    return create(src, FsPermission.getDefault(),
+        overwrite, replication, blockSize, progress, buffersize);
+  }
+  /**
    * Create a new dfs file with the specified block replication 
    * with write-progress reporting and return an output stream for writing
    * into the file.  
    * 
    * @param src stream name
+   * @param permission The permission of the directory being created.
+   * If permission == null, use {@link FsPermission#getDefault()}.
    * @param overwrite do not check for file existence if true
    * @param replication block replication
    * @return output stream
    * @throws IOException
+   * @see {@link ClientProtocol#create(String, FsPermission, String, boolean, short, long)}
    */
   public OutputStream create(String src, 
+                             FsPermission permission,
                              boolean overwrite, 
                              short replication,
                              long blockSize,
@@ -351,8 +372,13 @@
                              int buffersize
                              ) throws IOException {
     checkOpen();
-    OutputStream result = new DFSOutputStream(
-        src, overwrite, replication, blockSize, progress, buffersize);
+    if (permission == null) {
+      permission = FsPermission.getDefault();
+    }
+    FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
+    LOG.debug(src + ": masked=" + masked);
+    OutputStream result = new DFSOutputStream(src, masked,
+        overwrite, replication, blockSize, progress, buffersize);
     synchronized (pendingCreates) {
       pendingCreates.put(src.toString(), result);
     }
@@ -491,7 +517,27 @@
    */
   public boolean mkdirs(String src) throws IOException {
     checkOpen();
-    return namenode.mkdirs(src);
+    return namenode.mkdirs(src, null);
+  }
+
+  /**
+   * Create a directory (or hierarchy of directories) with the given
+   * name and permission.
+   *
+   * @param src The path of the directory being created
+   * @param permission The permission of the directory being created.
+   * If permission == null, use {@link FsPermission#getDefault()}.
+   * @return True if the operation success.
+   * @see {@link ClientProtocol#mkdirs(String, FsPermission)}
+   */
+  public boolean mkdirs(String src, FsPermission permission)throws IOException{
+    checkOpen();
+    if (permission == null) {
+      permission = FsPermission.getDefault();
+    }
+    FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
+    LOG.debug(src + ": masked=" + masked);
+    return namenode.mkdirs(src, masked);
   }
 
   /**
@@ -1394,8 +1440,10 @@
     private Progressable progress;
     /**
      * Create a new output stream to the given DataNode.
+     * @see {@link ClientProtocol#create(String, FsPermission, String, boolean, short, long)}
      */
-    public DFSOutputStream(String src, boolean overwrite, 
+    public DFSOutputStream(String src, FsPermission masked,
+                           boolean overwrite,
                            short replication, long blockSize,
                            Progressable progress,
                            int buffersize
@@ -1422,7 +1470,7 @@
       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
                                               bytesPerChecksum);
       namenode.create(
-          src.toString(), clientName, overwrite, replication, blockSize);
+          src, masked, clientName, overwrite, replication, blockSize);
     }
 
     private void openBackupStream() throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java Fri Jan  4 14:12:15 2008
@@ -52,7 +52,11 @@
           node.isDirectory(), 
           node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
           node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
-          node.getModificationTime(), new Path(path));
+          node.getModificationTime(),
+          node.getFsPermission(),
+          node.getUserName(),
+          node.getGroupName(),
+          new Path(path));
   }
 
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Fri Jan  4 14:12:15 2008
@@ -119,9 +119,8 @@
     int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
 
-    return new FSDataOutputStream( dfs.create(getPathName(f), overwrite, 
-                                              replication, blockSize, 
-                                              progress, bufferSize) );
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission,
+        overwrite, replication, blockSize, progress, bufferSize));
   }
 
   public boolean setReplication(Path src, 
@@ -176,7 +175,7 @@
   }
 
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    return dfs.mkdirs(getPathName(f));
+    return dfs.mkdirs(getPathName(f), permission);
   }
 
   public void close() throws IOException {
@@ -332,5 +331,20 @@
       DFSFileInfo p = dfs.getFileInfo(getPathName(f));
       return p;
     }
+  }
+
+  /** {@inheritDoc }*/
+  public void setPermission(Path p, FsPermission permission
+      ) throws IOException {
+    dfs.namenode.setPermission(getPathName(p), permission);
+  }
+
+  /** {@inheritDoc }*/
+  public void setOwner(Path p, String username, String groupname
+      ) throws IOException {
+    if (username == null && groupname == null) {
+      throw new IOException("username == null && groupname == null");
+    }
+    dfs.namenode.setOwner(getPathName(p), username, groupname);
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Fri Jan  4 14:12:15 2008
@@ -179,7 +179,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -10;
+  public static final int LAYOUT_VERSION = -11;
   // Current version: 
-  // a directory has a block list length of -1
+  // Added permission information to INode.
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri Jan  4 14:12:15 2008
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -39,7 +40,7 @@
 class FSDirectory implements FSConstants {
 
   FSNamesystem namesystem = null;
-  INodeDirectory rootDir = new INodeDirectory(INodeDirectory.ROOT_NAME);
+  final INodeDirectory rootDir;
   FSImage fsImage;  
   boolean ready = false;
   // Metrics record
@@ -47,12 +48,12 @@
     
   /** Access an existing dfs name directory. */
   public FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
-    this.fsImage = new FSImage();
-    namesystem = ns;
-    initialize(conf);
+    this(new FSImage(), ns, conf);
   }
 
   public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
+    rootDir = new INodeDirectory(INodeDirectory.ROOT_NAME,
+        ns.createFsOwnerPermissions(new FsPermission((short)0755)));
     this.fsImage = fsImage;
     namesystem = ns;
     initialize(conf);
@@ -116,6 +117,7 @@
    * Add the given filename to the fs.
    */
   INode addFile(String path, 
+                PermissionStatus permissions,
                 short replication,
                 long preferredBlockSize,
                 String clientName,
@@ -126,10 +128,11 @@
 
     // Always do an implicit mkdirs for parent directory tree.
     long modTime = FSNamesystem.now();
-    if (!mkdirs(new Path(path).getParent().toString(), modTime)) {
+    if (!mkdirs(new Path(path).getParent().toString(), permissions, true,
+        modTime)) {
       return null;
     }
-    INodeFile newNode = new INodeFileUnderConstruction(replication, 
+    INodeFile newNode = new INodeFileUnderConstruction(permissions,replication,
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
     synchronized (rootDir) {
@@ -155,16 +158,17 @@
   /**
    */
   INode unprotectedAddFile( String path, 
+                            PermissionStatus permissions,
                             Block[] blocks, 
                             short replication,
                             long modificationTime,
                             long preferredBlockSize) {
     INode newNode;
     if (blocks == null)
-      newNode = new INodeDirectory(modificationTime);
+      newNode = new INodeDirectory(permissions, modificationTime);
     else
-      newNode = new INodeFile(blocks.length, replication, modificationTime,
-                              preferredBlockSize);
+      newNode = new INodeFile(permissions, blocks.length, replication,
+          modificationTime, preferredBlockSize);
     synchronized (rootDir) {
       try {
         newNode = rootDir.addNode(path, newNode);
@@ -376,6 +380,47 @@
       return ((INodeFile)fileNode).getPreferredBlockSize();
     }
   }
+
+  boolean exists(String src) {
+    src = normalizePath(src);
+    synchronized(rootDir) {
+      INode inode = rootDir.getNode(src);
+      if (inode == null) {
+         return false;
+      }
+      return inode.isDirectory()? true: ((INodeFile)inode).getBlocks() != null;
+    }
+  }
+
+  void setPermission(String src, FsPermission permission
+      ) throws IOException {
+    unprotectedSetPermission(src, permission);
+    fsImage.getEditLog().logSetPermissions(src, permission);
+  }
+
+  void unprotectedSetPermission(String src, FsPermission permissions) {
+    synchronized(rootDir) {
+      rootDir.getNode(src).setPermission(permissions);
+    }
+  }
+
+  void setOwner(String src, String username, String groupname
+      ) throws IOException {
+    unprotectedSetOwner(src, username, groupname);
+    fsImage.getEditLog().logSetOwner(src, username, groupname);
+  }
+
+  void unprotectedSetOwner(String src, String username, String groupname) {
+    synchronized(rootDir) {
+      INode inode = rootDir.getNode(src);
+      if (username != null) {
+        inode.setUser(username);
+      }
+      if (groupname != null) {
+        inode.setGroup(groupname);
+      }
+    }
+  }
     
   /**
    * Remove the file from management, return blocks
@@ -551,7 +596,8 @@
   /**
    * Create directory entries for every item
    */
-  boolean mkdirs(String src, long now) {
+  boolean mkdirs(String src, PermissionStatus permissions,
+      boolean inheritPermission, long now) {
     src = normalizePath(src);
 
     // Use this to collect all the dirs we need to construct
@@ -573,7 +619,8 @@
     for (int i = numElts - 1; i >= 0; i--) {
       String cur = v.get(i);
       try {
-        INode inserted = unprotectedMkdir(cur, now);
+        INode inserted = unprotectedMkdir(cur, permissions,
+            inheritPermission || i != 0, now);
         if (inserted != null) {
           NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
                                         +"created directory "+cur);
@@ -596,9 +643,11 @@
 
   /**
    */
-  INode unprotectedMkdir(String src, long timestamp) throws FileNotFoundException {
+  INodeDirectory unprotectedMkdir(String src, PermissionStatus permissions,
+      boolean inheritPermission, long timestamp) throws FileNotFoundException {
     synchronized (rootDir) {
-      return rootDir.addNode(src, new INodeDirectory(timestamp));
+      return rootDir.addNode(src, new INodeDirectory(permissions, timestamp),
+          inheritPermission);
     }
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Fri Jan  4 14:12:15 2008
@@ -31,9 +31,8 @@
 import java.lang.Math;
 import java.nio.channels.FileChannel;
 
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -49,6 +48,8 @@
   //the following two are used only for backword compatibility :
   @Deprecated private static final byte OP_DATANODE_ADD = 5;
   @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
+  private static final byte OP_SET_PERMISSIONS = 7;
+  private static final byte OP_SET_OWNER = 8;
   private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -470,10 +471,14 @@
                 blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
               }
             }
+            PermissionStatus permissions = PermissionChecker.ANONYMOUS;
+            if (logVersion <= -11) {
+              permissions = PermissionStatus.read(in);
+            }
 
             // add to the file tree
-            fsDir.unprotectedAddFile(name.toString(), blocks, replication, 
-                                     mtime, blockSize);
+            fsDir.unprotectedAddFile(name.toString(), permissions,
+                blocks, replication, mtime, blockSize);
             break;
           }
           case OP_SET_REPLICATION: {
@@ -535,6 +540,7 @@
           }
           case OP_MKDIR: {
             UTF8 src = null;
+            PermissionStatus permissions = PermissionChecker.ANONYMOUS;
             if (logVersion >= -4) {
               src = new UTF8();
               src.readFields(in);
@@ -550,8 +556,12 @@
               }
               src = (UTF8) writables[0];
               timestamp = Long.parseLong(((UTF8)writables[1]).toString());
+
+              if (logVersion <= -11) {
+                permissions = PermissionStatus.read(in);
+              }
             }
-            fsDir.unprotectedMkdir(src.toString(), timestamp);
+            fsDir.unprotectedMkdir(src.toString(),permissions,false,timestamp);
             break;
           }
           case OP_DATANODE_ADD: {
@@ -572,6 +582,22 @@
             //Datanodes are not persistent any more.
             break;
           }
+          case OP_SET_PERMISSIONS: {
+            if (logVersion > -11)
+              throw new IOException("Unexpected opcode " + opcode
+                                    + " for version " + logVersion);
+            fsDir.unprotectedSetPermission(
+                readUTF8String(in), FsPermission.read(in));
+            break;
+          }
+          case OP_SET_OWNER: {
+            if (logVersion > -11)
+              throw new IOException("Unexpected opcode " + opcode
+                                    + " for version " + logVersion);
+            fsDir.unprotectedSetOwner(
+                readUTF8String(in), readUTF8String(in), readUTF8String(in));
+            break;
+          }
           default: {
             throw new IOException("Never seen opcode " + opcode);
           }
@@ -587,6 +613,13 @@
     return numEdits;
   }
   
+  private String readUTF8String(DataInputStream in) throws IOException {
+    UTF8 utf8 = new UTF8();
+    utf8.readFields(in);
+    String s = utf8.toString();
+    return s.length() == 0? null: s;
+  }
+
   static short adjustReplication(short replication) {
     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
     short minReplication = fsNamesys.getMinReplication();
@@ -750,7 +783,8 @@
       FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
     logEdit(OP_ADD,
             new ArrayWritable(UTF8.class, nameReplicationPair), 
-            new ArrayWritable(Block.class, newNode.getBlocks()));
+            new ArrayWritable(Block.class, newNode.getBlocks()),
+            newNode.getPermissionStatus());
   }
   
   /** 
@@ -761,7 +795,8 @@
       new UTF8(path),
       FSEditLog.toLogLong(newNode.getModificationTime())
     };
-    logEdit(OP_MKDIR, new ArrayWritable(UTF8.class, info));
+    logEdit(OP_MKDIR, new ArrayWritable(UTF8.class, info),
+        newNode.getPermissionStatus());
   }
   
   /** 
@@ -785,6 +820,18 @@
             FSEditLog.toLogReplication(replication));
   }
   
+  /**  Add set permissions record to edit log */
+  void logSetPermissions(String src, FsPermission permissions) {
+    logEdit(OP_SET_PERMISSIONS, new UTF8(src), permissions);
+  }
+
+  /**  Add set owner record to edit log */
+  void logSetOwner(String src, String username, String groupname) {
+    UTF8 u = new UTF8(username == null? "": username);
+    UTF8 g = new UTF8(groupname == null? "": groupname);
+    logEdit(OP_SET_OWNER, new UTF8(src), u, g);
+  }
+
   /** 
    * Add delete file record to edit log
    */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Fri Jan  4 14:12:15 2008
@@ -36,6 +36,7 @@
 import java.util.Random;
 import java.lang.Math;
 
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.io.UTF8;
@@ -711,8 +712,12 @@
             blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
           }
         }
-        fsDir.unprotectedAddFile(name.toString(), blocks, replication,
-                                 modificationTime, blockSize);
+        PermissionStatus permissions = PermissionChecker.ANONYMOUS;
+        if (imgVersion <= -11) {
+          permissions = PermissionStatus.read(in);
+        }
+        fsDir.unprotectedAddFile(name.toString(), permissions,
+            blocks, replication, modificationTime, blockSize);
       }
       
       // load datanode info
@@ -847,6 +852,7 @@
         out.writeInt(blocks.length);
         for (Block blk : blocks)
           blk.write(out);
+        fileINode.getPermissionStatus().write(out);
         return;
       }
       // write directory inode
@@ -854,6 +860,7 @@
       out.writeLong(inode.getModificationTime());
       out.writeLong(0);   // preferred block size
       out.writeInt(-1);    // # of blocks
+      inode.getPermissionStatus().write(out);
     }
     for(INode child : ((INodeDirectory)inode).getChildren()) {
       saveImage(fullName, child, out);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Jan  4 14:12:15 2008
@@ -21,11 +21,14 @@
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.ipc.Server;
 
 import java.io.BufferedWriter;
@@ -38,6 +41,8 @@
 import java.util.Map.Entry;
 import java.text.SimpleDateFormat;
 
+import javax.security.auth.login.LoginException;
+
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
  * DataNode.
@@ -53,6 +58,10 @@
 class FSNamesystem implements FSConstants {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
 
+  private boolean isPermissionEnabled;
+  private UserGroupInformation fsOwner;
+  private String supergroup;
+
   //
   // Stores the correct file name hierarchy
   //
@@ -204,12 +213,10 @@
   private static final SimpleDateFormat DATE_FORM =
     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-
   /**
    * FSNamesystem constructor.
    */
   FSNamesystem(NameNode nn, Configuration conf) throws IOException {
-    fsNamesystemObject = this;
     try {
       initialize(nn, conf);
     } catch(IOException e) {
@@ -285,7 +292,6 @@
    * is stored
    */
   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
-    fsNamesystemObject = this;
     setConfigurationParameters(conf);
     this.dir = new FSDirectory(fsImage, this, conf);
   }
@@ -295,6 +301,19 @@
    */
   private void setConfigurationParameters(Configuration conf) 
                                           throws IOException {
+    fsNamesystemObject = this;
+    try {
+      fsOwner = UnixUserGroupInformation.login(conf);
+    } catch (LoginException e) {
+      throw new IOException(StringUtils.stringifyException(e));
+    }
+    LOG.info("fsOwner=" + fsOwner);
+
+    this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup");
+    this.isPermissionEnabled = conf.getBoolean("dfs.permissions", true);
+    LOG.info("supergroup=" + supergroup);
+    LOG.info("isPermissionEnabled=" + isPermissionEnabled);
+
     this.replicator = new ReplicationTargetChooser(
                          conf.getBoolean("dfs.replication.considerLoad", true),
                          this,
@@ -645,12 +664,48 @@
   //
   /////////////////////////////////////////////////////////
   /**
+   * Set permissions for an existing file.
+   * @throws IOException
+   */
+  public synchronized void setPermission(String src, FsPermission permission
+      ) throws IOException {
+    checkOwner(src);
+    dir.setPermission(src, permission);
+    getEditLog().logSync();
+  }
+
+  /**
+   * Set owner for an existing file.
+   * @throws IOException
+   */
+  public synchronized void setOwner(String src, String username, String group
+      ) throws IOException {
+    PermissionChecker pc = checkOwner(src);
+    if (!pc.isSuper) {
+      if (username != null && !pc.user.equals(username)) {
+        throw new AccessControlException("Non-super user cannot change owner.");
+      }
+      if (group != null && !pc.containsGroup(group)) {
+        throw new AccessControlException("User does not belong to " + group
+            + " .");
+      }
+    }
+    dir.setOwner(src, username, group);
+    getEditLog().logSync();
+  }
+
+  /**
    * Get block locations within the specified range.
    * 
    * @see ClientProtocol#open(String, long, long)
    * @see ClientProtocol#getBlockLocations(String, long, long)
    */
-  LocatedBlocks getBlockLocations(String clientMachine,
+  LocatedBlocks getBlockLocations(String clientMachine, String src,
+      long offset, long length) throws IOException {
+    checkPathAccess(src, FsAction.READ);
+    return getBlockLocationsInternal(clientMachine, src, offset, length);
+  }
+  LocatedBlocks getBlockLocationsInternal(String clientMachine,
                                   String src, 
                                   long offset, 
                                   long length
@@ -663,7 +718,7 @@
     }
 
     DatanodeDescriptor client = null;
-    LocatedBlocks blocks =  getBlockLocations(dir.getFileINode(src), 
+    LocatedBlocks blocks =  getBlockLocationInternal(dir.getFileINode(src),
                                               offset, length, 
                                               Integer.MAX_VALUE);
     if (blocks == null) {
@@ -679,7 +734,7 @@
     return blocks;
   }
   
-  private synchronized LocatedBlocks getBlockLocations(INodeFile inode, 
+  private synchronized LocatedBlocks getBlockLocationInternal(INodeFile inode,
                                                        long offset, 
                                                        long length,
                                                        int nrBlocksToReturn) {
@@ -760,6 +815,7 @@
     if (isInSafeMode())
       throw new SafeModeException("Cannot set replication for " + src, safeMode);
     verifyReplication(src, replication, null);
+    checkPathAccess(src, FsAction.WRITE);
 
     int[] oldReplication = new int[1];
     Block[] fileBlocks;
@@ -786,7 +842,8 @@
     return true;
   }
     
-  public long getPreferredBlockSize(String filename) throws IOException {
+  long getPreferredBlockSize(String filename) throws IOException {
+    checkTraverse(filename);
     return dir.getPreferredBlockSize(filename);
   }
     
@@ -811,10 +868,11 @@
                             text + " is less than the required minimum " + minReplication);
   }
 
-  void startFile(String src, String holder, String clientMachine, 
+  void startFile(String src, PermissionStatus permissions,
+                 String holder, String clientMachine,
                  boolean overwrite, short replication, long blockSize
                 ) throws IOException {
-    startFileInternal(src, holder, clientMachine, overwrite,
+    startFileInternal(src, permissions, holder, clientMachine, overwrite,
                       replication, blockSize);
     getEditLog().logSync();
   }
@@ -830,7 +888,8 @@
    * @throws IOException if the filename is invalid
    *         {@link FSDirectory#isValidToCreate(String)}.
    */
-  synchronized void startFileInternal(String src, 
+  private synchronized void startFileInternal(String src,
+                                              PermissionStatus permissions,
                                               String holder, 
                                               String clientMachine, 
                                               boolean overwrite,
@@ -844,6 +903,13 @@
     if (!isValidName(src)) {
       throw new IOException("Invalid file name: " + src);      	  
     }
+    if (overwrite && exists(src)) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+    else {
+      checkAncestorAccess(src, FsAction.WRITE);
+    }
+
     try {
       INode myFile = dir.getFileINode(src);
       if (myFile != null && myFile.isUnderConstruction()) {
@@ -934,10 +1000,8 @@
       // Now we can add the name to the filesystem. This file has no
       // blocks associated with it.
       //
-      INode newNode = dir.addFile(src, replication, blockSize,
-                                  holder, 
-                                  clientMachine, 
-                                  clientNode);
+      INode newNode = dir.addFile(src, permissions,
+          replication, blockSize, holder, clientMachine, clientNode);
       if (newNode == null) {
         throw new IOException("DIR* NameSystem.startFile: " +
                               "Unable to add file to namespace.");
@@ -1287,22 +1351,25 @@
   // are made, edit namespace and return to client.
   ////////////////////////////////////////////////////////////////
 
+  /** Change the indicated filename. */
   public boolean renameTo(String src, String dst) throws IOException {
     boolean status = renameToInternal(src, dst);
     getEditLog().logSync();
     return status;
   }
 
-  /**
-   * Change the indicated filename.
-   */
-  public synchronized boolean renameToInternal(String src, String dst) throws IOException {
+  private synchronized boolean renameToInternal(String src, String dst
+      ) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
     if (isInSafeMode())
       throw new SafeModeException("Cannot rename " + src, safeMode);
     if (!isValidName(dst)) {
       throw new IOException("Invalid name: " + dst);
     }
+
+    checkParentAccess(src, FsAction.WRITE);
+    checkAncestorAccess(dst, FsAction.WRITE);
+
     return dir.renameTo(src, dst);
   }
 
@@ -1311,7 +1378,7 @@
    * invalidate some blocks that make up the file.
    */
   public boolean delete(String src) throws IOException {
-    boolean status = deleteInternal(src, true);
+    boolean status = deleteInternal(src, true, true);
     getEditLog().logSync();
     return status;
   }
@@ -1320,7 +1387,7 @@
    * An internal delete function that does not enforce safe mode
    */
   boolean deleteInSafeMode(String src) throws IOException {
-    boolean status = deleteInternal(src, false);
+    boolean status = deleteInternal(src, false, false);
     getEditLog().logSync();
     return status;
   }
@@ -1329,11 +1396,14 @@
    * invalidate some blocks that make up the file.
    */
   private synchronized boolean deleteInternal(String src, 
-                                              boolean enforceSafeMode) 
-                                              throws IOException {
+      boolean enforceSafeMode, boolean enforcePermission) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
     if (enforceSafeMode && isInSafeMode())
       throw new SafeModeException("Cannot delete " + src, safeMode);
+    if (enforcePermission) {
+      checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+    }
+
     Block deletedBlocks[] = dir.delete(src);
     if (deletedBlocks != null) {
       for (int i = 0; i < deletedBlocks.length; i++) {
@@ -1356,27 +1426,26 @@
   /**
    * Return whether the given filename exists
    */
-  public boolean exists(String src) {
-    if (dir.getFileBlocks(src) != null || dir.isDir(src)) {
-      return true;
-    } else {
-      return false;
-    }
+  public boolean exists(String src) throws AccessControlException {
+    checkTraverse(src);
+    return dir.exists(src);
   }
 
   /**
    * Whether the given name is a directory
    */
-  public boolean isDir(String src) {
+  public boolean isDir(String src) throws AccessControlException {
+    checkTraverse(src);
     return dir.isDir(src);
   }
 
-  /* Get the file info for a specific file.
+  /** Get the file info for a specific file.
    * @param src The string representation of the path to the file
    * @throws IOException if file does not exist
    * @return object containing information regarding the file
    */
   DFSFileInfo getFileInfo(String src) throws IOException {
+    checkTraverse(src);
     return dir.getFileInfo(src);
   }
 
@@ -1407,8 +1476,9 @@
   /**
    * Create all the necessary directories
    */
-  public boolean mkdirs(String src) throws IOException {
-    boolean status = mkdirsInternal(src);
+  public boolean mkdirs(String src, PermissionStatus permissions
+      ) throws IOException {
+    boolean status = mkdirsInternal(src, permissions);
     getEditLog().logSync();
     return status;
   }
@@ -1416,19 +1486,20 @@
   /**
    * Create all the necessary directories
    */
-  private synchronized boolean mkdirsInternal(String src) throws IOException {
-    boolean    success;
+  private synchronized boolean mkdirsInternal(String src,
+      PermissionStatus permissions) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     if (isInSafeMode())
       throw new SafeModeException("Cannot create directory " + src, safeMode);
     if (!isValidName(src)) {
       throw new IOException("Invalid directory name: " + src);
     }
-    success = dir.mkdirs(src, now());
-    if (!success) {
+    checkAncestorAccess(src, FsAction.WRITE);
+
+    if (!dir.mkdirs(src, permissions, false, now())) {
       throw new IOException("Invalid directory name: " + src);
     }
-    return success;
+    return true;
   }
 
   /* Get the size of the specified directory subtree.
@@ -1437,6 +1508,7 @@
    * @return size in bytes
    */
   long getContentLength(String src) throws IOException {
+    checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
     return dir.getContentLength(src);
   }
 
@@ -1660,7 +1732,13 @@
    * Get a listing of all files at 'src'.  The Object[] array
    * exists so we can return file attributes (soon to be implemented)
    */
-  public DFSFileInfo[] getListing(String src) {
+  public DFSFileInfo[] getListing(String src) throws IOException {
+    if (dir.isDir(src)) {
+      checkPathAccess(src, FsAction.READ);
+    }
+    else {
+      checkTraverse(src);
+    }
     return dir.getListing(src);
   }
 
@@ -3754,5 +3832,51 @@
 
   boolean startDistributedUpgradeIfNeeded() throws IOException {
     return upgradeManager.startUpgrade();
+  }
+
+  PermissionStatus createFsOwnerPermissions(FsPermission permission) {
+    return new PermissionStatus(fsOwner.getUserName(), supergroup, permission);
+  }
+
+  private PermissionChecker checkOwner(String path) throws AccessControlException {
+    return checkPermission(path, true, null, null, null, null);
+  }
+
+  private PermissionChecker checkPathAccess(String path, FsAction access
+      ) throws AccessControlException {
+    return checkPermission(path, false, null, null, access, null);
+  }
+
+  private PermissionChecker checkParentAccess(String path, FsAction access
+      ) throws AccessControlException {
+    return checkPermission(path, false, null, access, null, null);
+  }
+
+  private PermissionChecker checkAncestorAccess(String path, FsAction access
+      ) throws AccessControlException {
+    return checkPermission(path, false, access, null, null, null);
+  }
+
+  private PermissionChecker checkTraverse(String path
+      ) throws AccessControlException {
+    return checkPermission(path, false, null, null, null, null);
+  }
+
+  /**
+   * Check whether current user have permissions to access the path.
+   * For more details of the parameters, see
+   * {@link PermissionChecker#checkPermission(INodeDirectory, boolean, FsAction, FsAction, FsAction)}.
+   */
+  private PermissionChecker checkPermission(String path, boolean doCheckOwner,
+      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+      FsAction subAccess) throws AccessControlException {
+    PermissionChecker pc = new PermissionChecker(
+        fsOwner.getUserName(), supergroup);
+    if (isPermissionEnabled && !pc.isSuper) {
+      dir.waitForReady();
+      pc.checkPermission(path, dir.rootDir, doCheckOwner,
+          ancestorAccess, parentAccess, access, subAccess);
+    }
+    return pc;
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java Fri Jan  4 14:12:15 2008
@@ -26,6 +26,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 
 /**
@@ -38,15 +39,88 @@
   protected INodeDirectory parent;
   protected long modificationTime;
 
-  protected INode(String name) {
-    this(0L);
+  //Only updated by updatePermissionStatus(...).
+  //Other codes should not modify it.
+  private long permission;
+
+  private static enum PermissionStatusFormat {
+    MODE(0, 16),
+    GROUP(MODE.OFFSET + MODE.LENGTH, 25),
+    USER(GROUP.OFFSET + GROUP.LENGTH, 23);
+
+    final int OFFSET;
+    final int LENGTH; //bit length
+    final long MASK;
+
+    PermissionStatusFormat(int offset, int length) {
+      OFFSET = offset;
+      LENGTH = length;
+      MASK = ((-1L) >>> (64 - LENGTH)) << OFFSET;
+    }
+
+    long retrieve(long record) {
+      return (record & MASK) >>> OFFSET;
+    }
+
+    long combine(long bits, long record) {
+      return (record & ~MASK) | (bits << OFFSET);
+    }
+  }
+
+  protected INode(String name, PermissionStatus permissions) {
+    this(permissions, 0L);
     setLocalName(name);
   }
 
-  INode(long mTime) {
+  INode(PermissionStatus permissions, long mTime) {
     this.name = null;
     this.parent = null;
     this.modificationTime = mTime;
+    setPermissionStatus(permissions);
+  }
+
+  /** Set the {@link PermissionStatus} */
+  protected void setPermissionStatus(PermissionStatus ps) {
+    setUser(ps.getUserName());
+    setGroup(ps.getGroupName());
+    setPermission(ps.getPermission());
+  }
+  /** Get the {@link PermissionStatus} */
+  protected PermissionStatus getPermissionStatus() {
+    return new PermissionStatus(getUserName(),getGroupName(),getFsPermission());
+  }
+  private synchronized void updatePermissionStatus(
+      PermissionStatusFormat f, long n) {
+    permission = f.combine(n, permission);
+  }
+  /** Get user name */
+  protected String getUserName() {
+    int n = (int)PermissionStatusFormat.USER.retrieve(permission);
+    return SerialNumberManager.INSTANCE.getUser(n);
+  }
+  /** Set user */
+  protected void setUser(String user) {
+    int n = SerialNumberManager.INSTANCE.getUserSerialNumber(user);
+    updatePermissionStatus(PermissionStatusFormat.USER, n);
+  }
+  /** Get group name */
+  protected String getGroupName() {
+    int n = (int)PermissionStatusFormat.GROUP.retrieve(permission);
+    return SerialNumberManager.INSTANCE.getGroup(n);
+  }
+  /** Set group */
+  protected void setGroup(String group) {
+    int n = SerialNumberManager.INSTANCE.getGroupSerialNumber(group);
+    updatePermissionStatus(PermissionStatusFormat.GROUP, n);
+  }
+  /** Get the {@link FsPermission} */
+  protected FsPermission getFsPermission() {
+    return new FsPermission(
+        (short)PermissionStatusFormat.MODE.retrieve(permission));
+  }
+  /** Set the {@link FsPermission} of this {@link INode} */
+  protected void setPermission(FsPermission permission) {
+    updatePermissionStatus(PermissionStatusFormat.MODE, permission.toShort());
   }
 
   /**
@@ -71,6 +145,11 @@
     this.name = string2Bytes(name);
   }
 
+  /** {@inheritDoc} */
+  public String toString() {
+    return "\"" + getLocalName() + "\":" + getPermissionStatus();
+  }
+
   /**
    * Get the full absolute path name of this file (recursively computed).
    * 
@@ -231,13 +310,13 @@
 
   private List<INode> children;
 
-  INodeDirectory(String name) {
-    super(name);
+  INodeDirectory(String name, PermissionStatus permissions) {
+    super(name, permissions);
     this.children = null;
   }
 
-  INodeDirectory(long mTime) {
-    super(mTime);
+  INodeDirectory(PermissionStatus permissions, long mTime) {
+    super(permissions, mTime);
     this.children = null;
   }
 
@@ -380,19 +459,31 @@
     children.add(-low - 1, node);
     // update modification time of the parent directory
     setModificationTime(node.getModificationTime());
+    if (node.getGroupName() == null) {
+      node.setGroup(getGroupName());
+    }
     return node;
   }
 
   /**
+   * Equivalent to addNode(path, newNode, false).
+   * @see #addNode(String, INode, boolean)
+   */
+  <T extends INode> T addNode(String path, T newNode) throws FileNotFoundException {
+    return addNode(path, newNode, false);
+  }
+  /**
    * Add new INode to the file tree.
    * Find the parent and insert 
    * 
    * @param path file path
    * @param newNode INode to be added
+   * @param inheritPermission If true, copy the parent's permission to newNode.
    * @return null if the node already exists; inserted INode, otherwise
-   * @throws FileNotFoundException 
+   * @throws FileNotFoundException
    */
-  <T extends INode> T addNode(String path, T newNode) throws FileNotFoundException {
+  <T extends INode> T addNode(String path, T newNode, boolean inheritPermission
+      ) throws FileNotFoundException {
     byte[][] pathComponents = getPathComponents(path);
     assert pathComponents != null : "Incorrect path " + path;
     int pathLen = pathComponents.length;
@@ -409,6 +500,17 @@
       throw new FileNotFoundException("Parent path is not a directory: "+path);
     }
     INodeDirectory parentNode = (INodeDirectory)node;
+
+    if (inheritPermission) {
+      FsPermission p = parentNode.getFsPermission();
+      //make sure the  permission has wx for the user
+      if (!p.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
+        p = new FsPermission(p.getUserAction().or(FsAction.WRITE_EXECUTE),
+            p.getGroupAction(), p.getOtherAction());
+      }
+      newNode.setPermission(p);
+    }
+
     // insert into the parent children list
     newNode.name = pathComponents[pathLen-1];
     return parentNode.addChild(newNode);
@@ -466,28 +568,37 @@
 }
 
 class INodeFile extends INode {
+  static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
+
   private BlockInfo blocks[] = null;
   protected short blockReplication;
   protected long preferredBlockSize;
 
-  /**
-   */
-  INodeFile(int nrBlocks, short replication, long modificationTime,
+  INodeFile(PermissionStatus permissions,
+            int nrBlocks, short replication, long modificationTime,
             long preferredBlockSize) {
-    super(modificationTime);
-    this.blockReplication = replication;
-    this.preferredBlockSize = preferredBlockSize;
-    allocateBlocks(nrBlocks);
+    this(permissions, new BlockInfo[nrBlocks], replication,
+        modificationTime, preferredBlockSize);
   }
 
-  protected INodeFile(BlockInfo[] blklist, short replication, long modificationTime,
+  protected INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
+                      short replication, long modificationTime,
                       long preferredBlockSize) {
-    super(modificationTime);
+    super(permissions, modificationTime);
     this.blockReplication = replication;
     this.preferredBlockSize = preferredBlockSize;
     blocks = blklist;
   }
 
+  /**
+   * Set the {@link FsPermission} of this {@link INodeFile}.
+   * Since this is a file,
+   * the {@link FsAction#EXECUTE} action, if any, is ignored.
+   */
+  protected void setPermission(FsPermission permission) {
+    super.setPermission(permission.applyUMask(UMASK));
+  }
+
   boolean isDirectory() {
     return false;
   }
@@ -513,14 +624,6 @@
   }
 
   /**
-   * Allocate space for blocks.
-   * @param nrBlocks number of blocks
-   */
-  void allocateBlocks(int nrBlocks) {
-    this.blocks = new BlockInfo[nrBlocks];
-  }
-
-  /**
    * add a block to the block list
    */
   void addBlock(BlockInfo newblock) {
@@ -609,14 +712,16 @@
   protected StringBytesWritable clientMachine;
   protected DatanodeDescriptor clientNode; // if client is a cluster node too.
 
-  INodeFileUnderConstruction(short replication,
+  INodeFileUnderConstruction(PermissionStatus permissions,
+                             short replication,
                              long preferredBlockSize,
                              long modTime,
                              String clientName,
                              String clientMachine,
                              DatanodeDescriptor clientNode) 
                              throws IOException {
-    super(0, replication, modTime, preferredBlockSize);
+    super(permissions.applyUMask(UMASK), 0, replication, modTime,
+        preferredBlockSize);
     this.clientName = new StringBytesWritable(clientName);
     this.clientMachine = new StringBytesWritable(clientMachine);
     this.clientNode = clientNode;
@@ -646,7 +751,8 @@
   // converts a INodeFileUnderConstruction into a INodeFile
   //
   INodeFile convertToInodeFile() {
-    INodeFile obj = new INodeFile(getBlocks(),
+    INodeFile obj = new INodeFile(getPermissionStatus(),
+                                  getBlocks(),
                                   getReplication(),
                                   getModificationTime(),
                                   getPreferredBlockSize());

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Fri Jan  4 14:12:15 2008
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.StringUtils;
@@ -254,9 +255,9 @@
     return clientMachine;
   }
 
-  /**
-   */
+  /** {@inheritDoc} */
   public void create(String src, 
+                     FsPermission masked,
                              String clientName, 
                              boolean overwrite,
                              short replication,
@@ -269,8 +270,9 @@
       throw new IOException("create: Pathname too long.  Limit " 
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
-    namesystem.startFile(
-        src, clientName, clientMachine, overwrite, replication, blockSize);
+    namesystem.startFile(src,
+        new PermissionStatus(Server.getUserInfo().getUserName(), null, masked),
+        clientName, clientMachine, overwrite, replication, blockSize);
     myMetrics.createFile();
   }
 
@@ -280,6 +282,18 @@
     return namesystem.setReplication(src, replication);
   }
     
+  /** {@inheritDoc} */
+  public void setPermission(String src, FsPermission permissions
+      ) throws IOException {
+    namesystem.setPermission(src, permissions);
+  }
+
+  /** {@inheritDoc} */
+  public void setOwner(String src, String username, String groupname
+      ) throws IOException {
+    namesystem.setOwner(src, username, groupname);
+  }
+
   /**
    */
   public LocatedBlock addBlock(String src, 
@@ -389,15 +403,15 @@
             srcPath.depth() <= MAX_PATH_DEPTH);
   }
     
-  /**
-   */
-  public boolean mkdirs(String src) throws IOException {
+  /** {@inheritDoc} */
+  public boolean mkdirs(String src, FsPermission masked) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
     if (!checkPathLength(src)) {
       throw new IOException("mkdirs: Pathname too long.  Limit " 
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
-    return namesystem.mkdirs(src);
+    return namesystem.mkdirs(src,
+        new PermissionStatus(Server.getUserInfo().getUserName(), null, masked));
   }
 
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java Fri Jan  4 14:12:15 2008
@@ -68,6 +68,7 @@
   public static final int FIXING_DELETE = 2;
   
   private NameNode nn;
+  private ClientProtocol namenodeproxy;
   private String lostFound = null;
   private boolean lfInited = false;
   private boolean lfInitedOk = false;
@@ -95,6 +96,7 @@
                       HttpServletResponse response) throws IOException {
     this.conf = conf;
     this.nn = nn;
+    this.namenodeproxy =DFSClient.createNamenode(nn.getNameNodeAddress(),conf);
     this.out = response.getWriter();
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -114,7 +116,7 @@
    */
   public void fsck() throws IOException {
     try {
-      DFSFileInfo[] files = nn.getListing(path);
+      DFSFileInfo[] files = namenodeproxy.getListing(path);
       FsckResult res = new FsckResult();
       res.setReplication((short) conf.getInt("dfs.replication", 3));
       if (files != null) {
@@ -144,7 +146,7 @@
         out.println(file.getPath().toString() + " <dir>");
       }
       res.totalDirs++;
-      DFSFileInfo[] files = nn.getListing(file.getPath().toString());
+      DFSFileInfo[] files =namenodeproxy.getListing(file.getPath().toString());
       for (int i = 0; i < files.length; i++) {
         check(files[i], res);
       }
@@ -153,8 +155,8 @@
     res.totalFiles++;
     long fileLen = file.getLen();
     res.totalSize += fileLen;
-    LocatedBlocks blocks = nn.getBlockLocations(file.getPath().toString(),
-        0, fileLen);
+    LocatedBlocks blocks = namenodeproxy.getBlockLocations(
+        file.getPath().toString(), 0, fileLen);
     res.totalBlocks += blocks.locatedBlockCount();
     if (showFiles) {
       out.print(file.getPath().toString() + " " + fileLen + " bytes, " +
@@ -248,7 +250,7 @@
         lostFoundMove(file, blocks);
         break;
       case FIXING_DELETE:
-        nn.delete(file.getPath().toString());
+        namenodeproxy.delete(file.getPath().toString());
       }
     }
     if (showFiles) {
@@ -276,7 +278,7 @@
     String target = lostFound + file.getPath();
     String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
     try {
-      if (!nn.mkdirs(target)) {
+      if (!namenodeproxy.mkdirs(target, file.getPermission())) {
         LOG.warn(errmsg);
         return;
       }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PermissionChecker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PermissionChecker.java?rev=609029&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PermissionChecker.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PermissionChecker.java Fri Jan  4 14:12:15 2008
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/** Perform permission checking in {@link FSNamesystem}. */
+class PermissionChecker {
+  static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
+  /** Anonymous PermissionStatus. */
+  static final PermissionStatus ANONYMOUS = PermissionStatus.createImmutable(
+      "HadoopAnonymous", "HadoopAnonymous", new FsPermission((short)0777));
+
+  final String user;
+  private final Set<String> groups = new HashSet<String>();
+  final boolean isSuper;
+
+  PermissionChecker(String fsOwner, String supergroup
+      ) throws AccessControlException{
+    UserGroupInformation ugi = Server.getUserInfo();
+    LOG.debug("ugi=" + ugi);
+
+    if (ugi != null) {
+      user = ugi.getUserName();
+      groups.addAll(Arrays.asList(ugi.getGroupNames()));
+      isSuper = user.equals(fsOwner) || groups.contains(supergroup);
+    }
+    else {
+      throw new AccessControlException("ugi = null");
+    }
+  }
+
+  boolean containsGroup(String group) {return groups.contains(group);}
+
+  /**
+   * Check whether current user have permissions to access the path.
+   * Traverse is always checked.
+   *
+   * Parent path means the parent directory for the path.
+   * Ancestor path means the last (the closest) existing ancestor directory
+   * of the path.
+   * Note that if the parent path exists,
+   * then the parent path and the ancestor path are the same.
+   *
+   * For example, suppose the path is "/foo/bar/baz".
+   * No matter baz is a file or a directory,
+   * the parent path is "/foo/bar".
+   * If bar exists, then the ancestor path is also "/foo/bar".
+   * If bar does not exist and foo exists,
+   * then the ancestor path is "/foo".
+   * Further, if both foo and bar do not exist,
+   * then the ancestor path is "/".
+   *
+   * @param doCheckOwner Require user to be the owner of the path?
+   * @param ancestorAccess The access required by the ancestor of the path.
+   * @param parentAccess The access required by the parent of the path.
+   * @param access The access required by the path.
+   * @param subAccess If path is a directory,
+   * it is the access required of the path and all the sub-directories.
+   * If path is not a directory, there is no effect.
+   * @return a PermissionChecker object which caches data for later use.
+   * @throws AccessControlException
+   */
+  void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
+      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+      FsAction subAccess) throws AccessControlException {
+    LOG.debug("ACCESS CHECK: " + this
+        + ", doCheckOwner=" + doCheckOwner
+        + ", ancestorAccess=" + ancestorAccess
+        + ", parentAccess=" + parentAccess
+        + ", access=" + access
+        + ", subAccess=" + subAccess);
+
+    synchronized(root) {
+      INode[] inodes = root.getExistingPathINodes(path);
+      int ancestorIndex = inodes.length - 2;
+      for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
+          ancestorIndex--);
+      checkTraverse(inodes, ancestorIndex);
+
+      if (ancestorAccess != null && inodes.length > 1) {
+        check(inodes, ancestorIndex, ancestorAccess);
+      }
+      if (parentAccess != null && inodes.length > 1) {
+        check(inodes, inodes.length - 2, parentAccess);
+      }
+      if (access != null) {
+        check(inodes[inodes.length - 1], access);
+      }
+      if (subAccess != null) {
+        checkSubAccess(inodes[inodes.length - 1], subAccess);
+      }
+      if (doCheckOwner) {
+        checkOwner(inodes[inodes.length - 1]);
+      }
+    }
+  }
+
+  private void checkOwner(INode inode) throws AccessControlException {
+    if (inode != null && user.equals(inode.getUserName())) {
+      return;
+    }
+    throw new AccessControlException("Permission denied");
+  }
+
+  private void checkTraverse(INode[] inodes, int last
+      ) throws AccessControlException {
+    for(int j = 0; j <= last; j++) {
+      check(inodes[j], FsAction.EXECUTE);
+    }
+  }
+
+  private void checkSubAccess(INode inode, FsAction access
+      ) throws AccessControlException {
+    if (inode == null || !inode.isDirectory()) {
+      return;
+    }
+
+    Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
+    for(directories.push((INodeDirectory)inode); !directories.isEmpty(); ) {
+      INodeDirectory d = directories.pop();
+      check(d, access);
+
+      for(INode child : d.getChildren()) {
+        if (child.isDirectory()) {
+          directories.push((INodeDirectory)child);
+        }
+      }
+    }
+  }
+
+  private void check(INode[] inodes, int i, FsAction access
+      ) throws AccessControlException {
+    check(i >= 0? inodes[i]: null, access);
+  }
+
+  private void check(INode inode, FsAction access
+      ) throws AccessControlException {
+    if (inode == null) {
+      return;
+    }
+    FsPermission mode = inode.getFsPermission();
+
+    if (user.equals(inode.getUserName())) { //user class
+      if (mode.getUserAction().implies(access)) { return; }
+    }
+    else if (groups.contains(inode.getGroupName())) { //group class
+      if (mode.getGroupAction().implies(access)) { return; }
+    }
+    else { //other class
+      if (mode.getOtherAction().implies(access)) { return; }
+    }
+    throw new AccessControlException("Permission denied: user=" + user
+        + ", access=" + access + ", inode=" + inode);
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SerialNumberManager.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SerialNumberManager.java?rev=609029&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SerialNumberManager.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SerialNumberManager.java Fri Jan  4 14:12:15 2008
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+
+/** Manage name-to-serial-number maps for users and groups. */
+class SerialNumberManager {
+  /** This is the only instance of {@link SerialNumberManager}.*/
+  static final SerialNumberManager INSTANCE = new SerialNumberManager();
+
+  private SerialNumberMap<String> usermap = new SerialNumberMap<String>();
+  private SerialNumberMap<String> groupmap = new SerialNumberMap<String>();
+
+  private SerialNumberManager() {}
+
+  int getUserSerialNumber(String u) {return usermap.get(u);}
+  int getGroupSerialNumber(String g) {return groupmap.get(g);}
+  String getUser(int n) {return usermap.get(n);}
+  String getGroup(int n) {return groupmap.get(n);}
+
+  {
+    getUserSerialNumber(null);
+    getGroupSerialNumber(null);
+  }
+
+  private static class SerialNumberMap<T> {
+    private int max = 0;
+    private int nextSerialNumber() {return max++;}
+
+    private Map<T, Integer> t2i = new HashMap<T, Integer>();
+    private Map<Integer, T> i2t = new HashMap<Integer, T>();
+
+    synchronized int get(T t) {
+      Integer sn = t2i.get(t);
+      if (sn == null) {
+        sn = nextSerialNumber();
+        t2i.put(t, sn);
+        i2t.put(sn, t);
+      }
+      return sn;
+    }
+
+    synchronized T get(int i) {
+      if (!i2t.containsKey(i)) {
+        throw new IllegalStateException("!i2t.containsKey(" + i
+            + "), this=" + this);
+      }
+      return i2t.get(i);
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return "max=" + max + ",\n  t2i=" + t2i + ",\n  i2t=" + i2t;
+    }
+  }
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/FsPermission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/FsPermission.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/FsPermission.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/FsPermission.java Fri Jan  4 14:12:15 2008
@@ -35,6 +35,18 @@
     WritableFactories.setFactory(FsPermission.class, FACTORY);
   }
 
+  /** Create an immutable {@link FsPermission} object. */
+  public static FsPermission createImmutable(short permission) {
+    return new FsPermission(permission) {
+      public FsPermission applyUMask(FsPermission umask) {
+        throw new UnsupportedOperationException();
+      }
+      public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
   //POSIX permission style
   private FsAction useraction = null;
   private FsAction groupaction = null;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/PermissionStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/PermissionStatus.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/PermissionStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/permission/PermissionStatus.java Fri Jan  4 14:12:15 2008
@@ -34,6 +34,19 @@
     WritableFactories.setFactory(PermissionStatus.class, FACTORY);
   }
 
+  /** Create an immutable {@link PermissionStatus} object. */
+  public static PermissionStatus createImmutable(
+      String user, String group, FsPermission permission) {
+    return new PermissionStatus(user, group, permission) {
+      public PermissionStatus applyUMask(FsPermission umask) {
+        throw new UnsupportedOperationException();
+      }
+      public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
   private String username;
   private String groupname;
   private FsPermission permission;
@@ -55,6 +68,15 @@
 
   /** Return permission */
   public FsPermission getPermission() {return permission;}
+
+  /**
+   * Apply umask.
+   * @see FsPermission#applyUMask(FsPermission)
+   */
+  public PermissionStatus applyUMask(FsPermission umask) {
+    permission = permission.applyUMask(umask);
+    return this;
+  }
 
   /** {@inheritDoc} */
   public void readFields(DataInput in) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Fri Jan  4 14:12:15 2008
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 
@@ -45,6 +44,8 @@
 import java.util.Iterator;
 import java.util.Random;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -52,9 +53,9 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.ipc.SocketChannelOutputStream;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -125,7 +126,16 @@
    */
   public static UserGroupInformation getUserInfo() {
     Call call = CurCall.get();
-    return (call == null) ? null : call.connection.ticket;
+    if (call != null)
+      return call.connection.ticket;
+    // This is to support local calls (as opposed to rpc ones) to the name-node.
+    // Currently it is name-node specific and should be placed somewhere else.
+    try {
+      return UnixUserGroupInformation.login();
+    } catch(LoginException le) {
+      LOG.info(StringUtils.stringifyException(le));
+      return null;
+    }
   }
   
   private String bindAddress; 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java Fri Jan  4 14:12:15 2008
@@ -110,12 +110,6 @@
     return userName;
   }
 
-  /** Return the default  group name
-   */
-  public String getDefaultGroupName() {
-    return groupNames[0];
-  }
-
   /* The following two methods implements Writable interface */
   final private static String UGI_TECHNOLOGY = "STRING_UGI"; 
   /** Deserialize this object
@@ -218,13 +212,14 @@
     return currentUGI;
   }
   
-  /* Get current user's name and the names of all its groups from Unix.
+  /**
+   * Get current user's name and the names of all its groups from Unix.
    * It's assumed that there is only one UGI per user. If this user already
    * has a UGI in the ugi map, return the ugi in the map.
    * Otherwise get the current user's information from Unix, store it
    * in the map, and return it.
    */
-  private static UnixUserGroupInformation login() throws LoginException {
+  public static UnixUserGroupInformation login() throws LoginException {
     try {
       String userName =  getUnixUserName();
 
@@ -263,6 +258,7 @@
     UnixUserGroupInformation ugi = readFromConf(conf, UGI_PROPERTY_NAME);
     if (ugi == null) {
       ugi = login();
+      LOG.debug("Unix Login: " + ugi);
     } 
     return ugi;
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java Fri Jan  4 14:12:15 2008
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.security;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 
 /** A {@link Writable} interface for storing user and groups information.
  */
 public interface UserGroupInformation extends Writable {
+  public static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
   /** Get username
    * 
    * @return the user's name
@@ -33,10 +37,4 @@
    * @return an array of group names
    */
   public String[] getGroupNames();
-  
-  /** Get the default group name.
-   * 
-   * @return the default the group name
-   */
-  public String getDefaultGroupName();
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java Fri Jan  4 14:12:15 2008
@@ -9,6 +9,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.util.StringUtils;
@@ -443,8 +444,8 @@
     throws IOException {
       long start = System.currentTimeMillis();
       // dummyActionNoSynch(fileIdx);
-      nameNode.create(fileNames[daemonId][inputIdx], clientName, 
-                      true, replication, BLOCK_SIZE);
+      nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
+                      clientName, true, replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       return end-start;
     }
@@ -656,7 +657,8 @@
       String clientName = getClientName(007);
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName();
-        nameNode.create(fileName, clientName, true, replication, BLOCK_SIZE);
+        nameNode.create(fileName, FsPermission.getDefault(),
+                        clientName, true, replication, BLOCK_SIZE);
         addBlocks(fileName, clientName);
         nameNode.complete(fileName, clientName);
       }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java Fri Jan  4 14:12:15 2008
@@ -79,7 +79,7 @@
       long fileLen = size/replicationFactor;
       createFile(fileLen, replicationFactor);
 
-      List<LocatedBlock> locatedBlocks = cluster.getNameNode().
+      List<LocatedBlock> locatedBlocks = client.
       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
 
       int numOfBlocks = locatedBlocks.size();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java Fri Jan  4 14:12:15 2008
@@ -93,7 +93,11 @@
           DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, r.nextLong());
       DFSTestUtil.waitReplication(fs,fileName, REPLICATION_FACTOR);
       
-      List<LocatedBlock> locatedBlocks = cluster.getNameNode().
+      // get all datanodes
+      InetSocketAddress addr = new InetSocketAddress("localhost",
+          cluster.getNameNodePort());
+      DFSClient client = new DFSClient(addr, CONF);
+      List<LocatedBlock> locatedBlocks = client.namenode.
         getBlockLocations("/tmp.txt", 0, DEFAULT_BLOCK_SIZE).getLocatedBlocks();
       assertEquals(1, locatedBlocks.size());
       LocatedBlock block = locatedBlocks.get(0);
@@ -105,10 +109,6 @@
       cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
       cluster.waitActive();
       
-      // get all datanodes
-      InetSocketAddress addr = new InetSocketAddress("localhost",
-          cluster.getNameNodePort());
-      DFSClient client = new DFSClient(addr, CONF);
       DatanodeInfo[] datanodes = client.datanodeReport(DatanodeReportType.ALL);
 
       // find out the new node
@@ -157,7 +157,7 @@
       // block locations should contain two proxies and newNode
       checkBlocks(new DatanodeInfo[]{newNode, proxies.get(0), proxies.get(1)},
           fileName.toString(), 
-          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR);
+          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
       // case 4: proxies.get(0) is not a valid del hint
       LOG.info("Testcase 4: invalid del hint " + proxies.get(0).getName() );
       assertTrue(replaceBlock(b, proxies.get(1), proxies.get(0), source));
@@ -166,7 +166,7 @@
        */
       checkBlocks(proxies.toArray(new DatanodeInfo[proxies.size()]), 
           fileName.toString(), 
-          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR);
+          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
     } finally {
       cluster.shutdown();
     }
@@ -174,14 +174,14 @@
   
   /* check if file's blocks exist at includeNodes */
   private void checkBlocks(DatanodeInfo[] includeNodes, String fileName, 
-      long fileLen, short replFactor) throws IOException {
+      long fileLen, short replFactor, DFSClient client) throws IOException {
     Boolean notDone;
     do {
       try {
         Thread.sleep(100);
       } catch(InterruptedException e) {
       }
-      List<LocatedBlock> blocks = cluster.getNameNode().
+      List<LocatedBlock> blocks = client.namenode.
       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
       assertEquals(1, blocks.size());
       DatanodeInfo[] nodes = blocks.get(0).getLocations();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java Fri Jan  4 14:12:15 2008
@@ -24,6 +24,7 @@
 import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
 
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.UTF8;
@@ -57,8 +58,11 @@
 
     // add a bunch of transactions.
     public void run() {
+      PermissionStatus p = FSNamesystem.getFSNamesystem(
+          ).createFsOwnerPermissions(new FsPermission((short)0777));
+
       for (int i = 0; i < numTransactions; i++) {
-        INodeFile inode = new INodeFile(0, replication, 0, blockSize);
+        INodeFile inode = new INodeFile(p, 0, replication, 0, blockSize);
         editLog.logCreateFile("/filename" + i, inode);
         editLog.logSync();
       }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java Fri Jan  4 14:12:15 2008
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
 
 import junit.framework.TestCase;
 /**
@@ -69,7 +70,9 @@
       DatanodeInfo[] dataNodes=null;
       boolean notWritten;
       do {
-        locatedBlocks = cluster.getNameNode().
+        DFSClient dfsclient = new DFSClient(
+            NetUtils.createSocketAddr(CONF.get("fs.default.name")), CONF);
+        locatedBlocks = dfsclient.namenode.
           getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
         assertEquals(2, locatedBlocks.size());
         notWritten = false;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java?rev=609029&r1=609028&r2=609029&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Fri Jan  4 14:12:15 2008
@@ -64,11 +64,8 @@
   private void checkFile(FileSystem fileSys, Path name, int repl)
     throws IOException {
     Configuration conf = fileSys.getConf();
-    ClientProtocol namenode = (ClientProtocol) RPC.getProxy(
-                      ClientProtocol.class,
-                      ClientProtocol.versionID,
-                      NetUtils.createSocketAddr(conf.get("fs.default.name")), 
-                      conf);
+    ClientProtocol namenode = DFSClient.createNamenode(
+        NetUtils.createSocketAddr(conf.get("fs.default.name")), conf);
       
     LocatedBlocks locations;
     boolean isReplicationDone;



Mime
View raw message