hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1449976 - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/ja...
Date Tue, 26 Feb 2013 00:10:18 GMT
Author: szetszwo
Date: Tue Feb 26 00:10:17 2013
New Revision: 1449976

URL: http://svn.apache.org/r1449976
Log:
Merge r1448505 through r1449957 from trunk.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1448505-1449957

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Feb 26 00:10:17 2013
@@ -310,6 +310,12 @@ Release 2.0.4-beta - UNRELEASED
 
   IMPROVEMENTS
 
+    HDFS-4222. NN is unresponsive and loses heartbeats from DNs when 
+    configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh)
+
+    HDFS-4304. Make FSEditLogOp.MAX_OP_SIZE configurable. (Colin Patrick
+    McCabe via atm)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -324,6 +330,9 @@ Release 2.0.4-beta - UNRELEASED
     but not in dfs.namenode.edits.dir are silently ignored.  (Arpit Agarwal
     via szetszwo)
 
+    HDFS-4482. ReplicationMonitor thread can exit with NPE due to the race 
+    between delete and replication of same file. (umamahesh)
+
 Release 2.0.3-alpha - 2013-02-06
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Tue Feb 26 00:10:17 2013
@@ -68,8 +68,9 @@ if (NOT GENERATED_JAVAH)
     MESSAGE(FATAL_ERROR "You must set the CMake variable GENERATED_JAVAH")
 endif (NOT GENERATED_JAVAH)
 
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2 -D_GNU_SOURCE")
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_FILE_OFFSET_BITS=64")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_GNU_SOURCE")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64")
 
 include_directories(
     ${GENERATED_JAVAH}

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java Tue Feb 26 00:10:17 2013
@@ -163,6 +163,11 @@ class BookKeeperEditLogInputStream exten
     return ("BookKeeperEditLogInputStream {" + this.getName() + "}");
   }
 
+  @Override
+  public void setMaxOpSize(int maxOpSize) {
+    reader.setMaxOpSize(maxOpSize);
+  }
+
   /**
    * Input stream implementation which can be used by 
    * FSEditLogOp.Reader

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Tue Feb 26 00:10:17 2013
@@ -56,6 +56,14 @@ fi
 COMMAND=$1
 shift
 
+case $COMMAND in
+  # usage flags
+  --help|-help|-h)
+    print_usage
+    exit
+    ;;
+esac
+
 # Determine if we're starting a secure datanode, and if so, redefine appropriate variables
 if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then
   if [ -n "$JSVC_HOME" ]; then

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1448505-1449957

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Feb 26 00:10:17 2013
@@ -392,6 +392,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT = 1;
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
+  public static final String  DFS_NAMENODE_MAX_OP_SIZE_KEY = "dfs.namenode.max.op.size";
+  public static final int     DFS_NAMENODE_MAX_OP_SIZE_DEFAULT = 50 * 1024 * 1024;
   
   public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Tue Feb 26 00:10:17 2013
@@ -142,4 +142,9 @@ class EditLogBackupInputStream extends E
   public boolean isInProgress() {
     return true;
   }
+
+  @Override
+  public void setMaxOpSize(int maxOpSize) {
+    reader.setMaxOpSize(maxOpSize);
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Tue Feb 26 00:10:17 2013
@@ -32,6 +32,7 @@ import java.security.PrivilegedException
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
@@ -53,6 +54,7 @@ public class EditLogFileInputStream exte
   private final long firstTxId;
   private final long lastTxId;
   private final boolean isInProgress;
+  private int maxOpSize;
   static private enum State {
     UNINIT,
     OPEN,
@@ -118,6 +120,7 @@ public class EditLogFileInputStream exte
     this.firstTxId = firstTxId;
     this.lastTxId = lastTxId;
     this.isInProgress = isInProgress;
+    this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
   }
 
   private void init() throws LogHeaderCorruptException, IOException {
@@ -134,6 +137,7 @@ public class EditLogFileInputStream exte
         throw new LogHeaderCorruptException("No header found in log");
       }
       reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
+      reader.setMaxOpSize(maxOpSize);
       state = State.OPEN;
     } finally {
       if (reader == null) {
@@ -412,5 +416,12 @@ public class EditLogFileInputStream exte
       return url.toString();
     }
   }
-  
+
+  @Override
+  public void setMaxOpSize(int maxOpSize) {
+    this.maxOpSize = maxOpSize;
+    if (reader != null) {
+      reader.setMaxOpSize(maxOpSize);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Tue Feb 26 00:10:17 2013
@@ -165,4 +165,9 @@ public abstract class EditLogInputStream
    * Return true if this stream is in progress, false if it is finalized.
    */
   public abstract boolean isInProgress();
+  
+  /**
+   * Set the maximum opcode size in bytes.
+   */
+  public abstract void setMaxOpSize(int maxOpSize);
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Feb 26 00:10:17 2013
@@ -1556,7 +1556,12 @@ public class FSDirectory implements Clos
 
     // fill up the inodes in the path from this inode to root
     for (int i = 0; i < depth; i++) {
-      inodes[depth - i - 1] = inode;
+      if (inode == null) {
+        NameNode.stateChangeLog.warn("Could not get full path."
+            + " Corresponding file might have deleted already.");
+        return null;
+      }
+      inodes[depth-i-1] = inode;
       inode = inode.parent;
     }
     return inodes;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Feb 26 00:10:17 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.io.WritableFact
 import org.apache.hadoop.hdfs.util.XMLUtils;
 import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
 import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
@@ -75,11 +76,6 @@ import java.io.EOFException;
 public abstract class FSEditLogOp {
   public final FSEditLogOpCodes opCode;
   long txid;
-  /**
-   * Opcode size is limited to 1.5 megabytes
-   */
-  public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
-
 
   @SuppressWarnings("deprecation")
   final public static class OpInstanceCache {
@@ -2555,6 +2551,7 @@ public abstract class FSEditLogOp {
     private final int logVersion;
     private final Checksum checksum;
     private final OpInstanceCache cache;
+    private int maxOpSize;
 
     /**
      * Construct the reader
@@ -2562,7 +2559,8 @@ public abstract class FSEditLogOp {
      * @param logVersion The version of the data coming from the stream.
      */
     @SuppressWarnings("deprecation")
-    public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
+    public Reader(DataInputStream in, StreamLimiter limiter,
+        int logVersion) {
       this.logVersion = logVersion;
       if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
         this.checksum = new PureJavaCrc32();
@@ -2578,6 +2576,11 @@ public abstract class FSEditLogOp {
       }
       this.limiter = limiter;
       this.cache = new OpInstanceCache();
+      this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
+    }
+
+    public void setMaxOpSize(int maxOpSize) {
+      this.maxOpSize = maxOpSize;
     }
 
     /**
@@ -2672,8 +2675,8 @@ public abstract class FSEditLogOp {
      * problematic byte.  This usually means the beginning of the opcode.
      */
     private FSEditLogOp decodeOp() throws IOException {
-      limiter.setLimit(MAX_OP_SIZE);
-      in.mark(MAX_OP_SIZE);
+      limiter.setLimit(maxOpSize);
+      in.mark(maxOpSize);
 
       if (checksum != null) {
         checksum.reset();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue Feb 26 00:10:17 2013
@@ -608,6 +608,12 @@ public class FSImage implements Closeabl
       editStreams = FSImagePreTransactionalStorageInspector
         .getEditLogStreams(storage);
     }
+    int maxOpSize = conf.getInt(DFSConfigKeys.
+          DFS_NAMENODE_MAX_OP_SIZE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
+    for (EditLogInputStream elis : editStreams) {
+      elis.setMaxOpSize(maxOpSize);
+    }
  
     LOG.debug("Planning to load image :\n" + imageFile);
     for (EditLogInputStream l : editStreams) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Feb 26 00:10:17 2013
@@ -308,6 +308,7 @@ public class FSNamesystem implements Nam
   private final boolean isPermissionEnabled;
   private final boolean persistBlocks;
   private final UserGroupInformation fsOwner;
+  private final String fsOwnerShortUserName;
   private final String supergroup;
   private final boolean standbyShouldCheckpoint;
   
@@ -538,6 +539,7 @@ public class FSNamesystem implements Nam
       this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
 
       this.fsOwner = UserGroupInformation.getCurrentUser();
+      this.fsOwnerShortUserName = fsOwner.getShortUserName();
       this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
                                  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
       this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
@@ -1121,9 +1123,9 @@ public class FSNamesystem implements Nam
    * Dump all metadata into specified file
    */
   void metaSave(String filename) throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
-      checkSuperuserPrivilege();
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
       PrintWriter out = new PrintWriter(new BufferedWriter(
           new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8)));
@@ -1200,6 +1202,7 @@ public class FSNamesystem implements Nam
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1207,7 +1210,7 @@ public class FSNamesystem implements Nam
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set permission for " + src, safeMode);
       }
-      checkOwner(src);
+      checkOwner(pc, src);
       dir.setPermission(src, permission);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(src, false);
@@ -1246,6 +1249,7 @@ public class FSNamesystem implements Nam
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1253,14 +1257,13 @@ public class FSNamesystem implements Nam
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
       }
-      FSPermissionChecker pc = checkOwner(src);
-      if (!pc.isSuper) {
-        if (username != null && !pc.user.equals(username)) {
-          throw new AccessControlException("Non-super user cannot change owner.");
+      checkOwner(pc, src);
+      if (!pc.isSuperUser()) {
+        if (username != null && !pc.getUser().equals(username)) {
+          throw new AccessControlException("Non-super user cannot change owner");
         }
         if (group != null && !pc.containsGroup(group)) {
-          throw new AccessControlException("User does not belong to " + group
-            + " .");
+          throw new AccessControlException("User does not belong to " + group);
         }
       }
       dir.setOwner(src, username, group);
@@ -1310,8 +1313,9 @@ public class FSNamesystem implements Nam
   LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     try {
-      return getBlockLocationsInt(src, offset, length, doAccessTime,
+      return getBlockLocationsInt(pc, src, offset, length, doAccessTime,
                                   needBlockToken, checkSafeMode);
     } catch (AccessControlException e) {
       if (isAuditEnabled() && isExternalInvocation()) {
@@ -1323,11 +1327,12 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private LocatedBlocks getBlockLocationsInt(String src, long offset, long length,
-      boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
+  private LocatedBlocks getBlockLocationsInt(FSPermissionChecker pc,
+      String src, long offset, long length, boolean doAccessTime,
+      boolean needBlockToken, boolean checkSafeMode)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
     if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.READ);
+      checkPathAccess(pc, src, FsAction.READ);
     }
 
     if (offset < 0) {
@@ -1461,13 +1466,14 @@ public class FSNamesystem implements Nam
     }
 
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
-      concatInternal(target, srcs);
+      concatInternal(pc, target, srcs);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(target, false);
       }
@@ -1483,18 +1489,18 @@ public class FSNamesystem implements Nam
   }
 
   /** See {@link #concat(String, String[])} */
-  private void concatInternal(String target, String [] srcs) 
+  private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
 
     // write permission for the target
     if (isPermissionEnabled) {
-      checkPathAccess(target, FsAction.WRITE);
+      checkPathAccess(pc, target, FsAction.WRITE);
 
       // and srcs
       for(String aSrc: srcs) {
-        checkPathAccess(aSrc, FsAction.READ); // read the file
-        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+        checkPathAccess(pc, aSrc, FsAction.READ); // read the file
+        checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete 
       }
     }
 
@@ -1616,13 +1622,14 @@ public class FSNamesystem implements Nam
       throw new IOException("Access time for hdfs is not configured. " +
                             " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
       // Write access is required to set access and modification times
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
       final INodesInPath iip = dir.getINodesInPath4Write(src);
       final INode inode = iip.getLastINode();
@@ -1664,6 +1671,7 @@ public class FSNamesystem implements Nam
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1671,7 +1679,7 @@ public class FSNamesystem implements Nam
       if (!createParent) {
         verifyParentDir(link);
       }
-      createSymlinkInternal(target, link, dirPerms, createParent);
+      createSymlinkInternal(pc, target, link, dirPerms, createParent);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(link, false);
       }
@@ -1689,8 +1697,8 @@ public class FSNamesystem implements Nam
   /**
    * Create a symbolic link.
    */
-  private void createSymlinkInternal(String target, String link,
-      PermissionStatus dirPerms, boolean createParent)
+  private void createSymlinkInternal(FSPermissionChecker pc, String target,
+      String link, PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1708,7 +1716,7 @@ public class FSNamesystem implements Nam
           +" either because the filename is invalid or the file exists");
     }
     if (isPermissionEnabled) {
-      checkAncestorAccess(link, FsAction.WRITE);
+      checkAncestorAccess(pc, link, FsAction.WRITE);
     }
     // validate that we have enough inodes.
     checkFsObjectLimit();
@@ -1747,17 +1755,16 @@ public class FSNamesystem implements Nam
   private boolean setReplicationInt(final String src, final short replication)
       throws IOException {
     blockManager.verifyReplication(src, replication, null);
-
     final boolean isFile;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set replication for " + src, safeMode);
       }
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
 
       final short[] oldReplication = new short[1];
@@ -1781,11 +1788,12 @@ public class FSNamesystem implements Nam
 
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
-        checkTraverse(filename);
+        checkTraverse(pc, filename);
       }
       return dir.getPreferredBlockSize(filename);
     } finally {
@@ -1846,11 +1854,11 @@ public class FSNamesystem implements Nam
       FileNotFoundException, ParentNotDirectoryException, IOException {
     boolean skipSync = false;
     final HdfsFileStatus stat;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      startFileInternal(src, permissions, holder, clientMachine, flag,
+      startFileInternal(pc, src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
       stat = dir.getFileInfo(src, false);
     } catch (StandbyException se) {
@@ -1889,7 +1897,7 @@ public class FSNamesystem implements Nam
    * 
    * @return the last block locations if the block is partial or null otherwise
    */
-  private LocatedBlock startFileInternal(String src,
+  private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
@@ -1923,9 +1931,9 @@ public class FSNamesystem implements Nam
     boolean append = flag.contains(CreateFlag.APPEND);
     if (isPermissionEnabled) {
       if (append || (overwrite && myFile != null)) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       } else {
-        checkAncestorAccess(src, FsAction.WRITE);
+        checkAncestorAccess(pc, src, FsAction.WRITE);
       }
     }
 
@@ -2041,6 +2049,7 @@ public class FSNamesystem implements Nam
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
     boolean skipSync = false;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2058,7 +2067,7 @@ public class FSNamesystem implements Nam
         return true;
       }
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
   
       recoverLeaseInternal(inode, src, holder, clientMachine, true);
@@ -2181,11 +2190,12 @@ public class FSNamesystem implements Nam
           DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
     }
     LocatedBlock lb = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
-      lb = startFileInternal(src, null, holder, clientMachine, 
+      lb = startFileInternal(pc, src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, blockManager.maxReplication, 0);
     } catch (StandbyException se) {
@@ -2723,11 +2733,12 @@ public class FSNamesystem implements Nam
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
-      status = renameToInternal(src, dst);
+      status = renameToInternal(pc, src, dst);
       if (status && isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false);
       }
@@ -2745,7 +2756,7 @@ public class FSNamesystem implements Nam
 
   /** @deprecated See {@link #renameTo(String, String)} */
   @Deprecated
-  private boolean renameToInternal(String src, String dst)
+  private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
     throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     if (isInSafeMode()) {
@@ -2761,8 +2772,8 @@ public class FSNamesystem implements Nam
       //      of rewriting the dst
       String actualdst = dir.isDir(dst)?
           dst + Path.SEPARATOR + new Path(src).getName(): dst;
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(actualdst, FsAction.WRITE);
+      checkParentAccess(pc, src, FsAction.WRITE);
+      checkAncestorAccess(pc, actualdst, FsAction.WRITE);
     }
 
     if (dir.renameTo(src, dst)) {
@@ -2780,11 +2791,11 @@ public class FSNamesystem implements Nam
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      renameToInternal(src, dst, options);
+      renameToInternal(pc, src, dst, options);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false); 
       }
@@ -2802,7 +2813,7 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private void renameToInternal(String src, String dst,
+  private void renameToInternal(FSPermissionChecker pc, String src, String dst,
       Options.Rename... options) throws IOException {
     assert hasWriteLock();
     if (isInSafeMode()) {
@@ -2812,8 +2823,8 @@ public class FSNamesystem implements Nam
       throw new InvalidPathException("Invalid name: " + dst);
     }
     if (isPermissionEnabled) {
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(dst, FsAction.WRITE);
+      checkParentAccess(pc, src, FsAction.WRITE);
+      checkAncestorAccess(pc, dst, FsAction.WRITE);
     }
 
     dir.renameTo(src, dst, options);
@@ -2855,6 +2866,10 @@ public class FSNamesystem implements Nam
     return status;
   }
     
+  private FSPermissionChecker getPermissionChecker()
+      throws AccessControlException {
+    return new FSPermissionChecker(fsOwnerShortUserName, supergroup);
+  }
   /**
    * Remove a file/directory from the namespace.
    * <p>
@@ -2871,7 +2886,7 @@ public class FSNamesystem implements Nam
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
              IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2882,7 +2897,7 @@ public class FSNamesystem implements Nam
         throw new IOException(src + " is non empty");
       }
       if (enforcePermission && isPermissionEnabled) {
-        checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+        checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL);
       }
       // Unlink the target directory from directory tree
       if (!dir.delete(src, collectedBlocks)) {
@@ -2999,9 +3014,8 @@ public class FSNamesystem implements Nam
     throws AccessControlException, UnresolvedLinkException,
            StandbyException, IOException {
     HdfsFileStatus stat = null;
-
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
-
     try {
       checkOperation(OperationCategory.READ);
 
@@ -3009,7 +3023,7 @@ public class FSNamesystem implements Nam
         throw new InvalidPathException("Invalid file name: " + src);
       }
       if (isPermissionEnabled) {
-        checkTraverse(src);
+        checkTraverse(pc, src);
       }
       stat = dir.getFileInfo(src, resolveLink);
     } catch (AccessControlException e) {
@@ -3053,11 +3067,11 @@ public class FSNamesystem implements Nam
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      status = mkdirsInternal(src, permissions, createParent);
+      status = mkdirsInternal(pc, src, permissions, createParent);
     } finally {
       writeUnlock();
     }
@@ -3074,7 +3088,7 @@ public class FSNamesystem implements Nam
   /**
    * Create all the necessary directories
    */
-  private boolean mkdirsInternal(String src,
+  private boolean mkdirsInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
@@ -3082,7 +3096,7 @@ public class FSNamesystem implements Nam
       throw new SafeModeException("Cannot create directory " + src, safeMode);
     }
     if (isPermissionEnabled) {
-      checkTraverse(src);
+      checkTraverse(pc, src);
     }
     if (dir.isDirMutable(src)) {
       // all the users of mkdirs() are used to expect 'true' even if
@@ -3093,7 +3107,7 @@ public class FSNamesystem implements Nam
       throw new InvalidPathException(src);
     }
     if (isPermissionEnabled) {
-      checkAncestorAccess(src, FsAction.WRITE);
+      checkAncestorAccess(pc, src, FsAction.WRITE);
     }
     if (!createParent) {
       verifyParentDir(src);
@@ -3112,12 +3126,13 @@ public class FSNamesystem implements Nam
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, StandbyException {
+    FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName,
+        supergroup);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-
       if (isPermissionEnabled) {
-        checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+        checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
       }
       return dir.getContentSummary(src);
     } finally {
@@ -3132,15 +3147,13 @@ public class FSNamesystem implements Nam
    */
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set quota on " + path, safeMode);
       }
-      if (isPermissionEnabled) {
-        checkSuperuserPrivilege();
-      }
       dir.setQuota(path, nsQuota, dsQuota);
     } finally {
       writeUnlock();
@@ -3515,15 +3528,16 @@ public class FSNamesystem implements Nam
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
     DirectoryListing dl;
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
 
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
-          checkPathAccess(src, FsAction.READ_EXECUTE);
+          checkPathAccess(pc, src, FsAction.READ_EXECUTE);
         } else {
-          checkTraverse(src);
+          checkTraverse(pc, src);
         }
       }
       if (isAuditEnabled() && isExternalInvocation()) {
@@ -3834,9 +3848,9 @@ public class FSNamesystem implements Nam
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
+    checkSuperuserPrivilege();
     readLock();
     try {
-      checkSuperuserPrivilege();
       if (!isInSafeMode()) {
         throw new IOException("Safe mode should be turned ON " +
                               "in order to create namespace image.");
@@ -3855,9 +3869,9 @@ public class FSNamesystem implements Nam
    * @throws AccessControlException if superuser privilege is violated.
    */
   boolean restoreFailedStorage(String arg) throws AccessControlException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
-      checkSuperuserPrivilege();
       
       // if it is disabled - enable it and vice versa.
       if(arg.equals("check"))
@@ -3877,10 +3891,10 @@ public class FSNamesystem implements Nam
   }
     
   void finalizeUpgrade() throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      checkSuperuserPrivilege();
       getFSImage().finalizeUpgrade();
     } finally {
       writeUnlock();
@@ -4616,10 +4630,10 @@ public class FSNamesystem implements Nam
   }
 
   CheckpointSignature rollEditLog() throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.JOURNAL);
-      checkSuperuserPrivilege();
       if (isInSafeMode()) {
         throw new SafeModeException("Log not rolled", safeMode);
       }
@@ -4670,61 +4684,64 @@ public class FSNamesystem implements Nam
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
 
-  private FSPermissionChecker checkOwner(String path
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, true, null, null, null, null);
+  private void checkOwner(FSPermissionChecker pc, String path)
+      throws AccessControlException, UnresolvedLinkException {
+    checkPermission(pc, path, true, null, null, null, null);
   }
 
-  private FSPermissionChecker checkPathAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, null, access, null);
+  private void checkPathAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, null, null, access, null);
   }
 
-  private FSPermissionChecker checkParentAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, access, null, null);
+  private void checkParentAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, null, access, null, null);
   }
 
-  private FSPermissionChecker checkAncestorAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, access, null, null, null);
+  private void checkAncestorAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, access, null, null, null);
   }
 
-  private FSPermissionChecker checkTraverse(String path
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, null, null, null);
+  private void checkTraverse(FSPermissionChecker pc, String path)
+      throws AccessControlException, UnresolvedLinkException {
+    checkPermission(pc, path, false, null, null, null, null);
   }
 
   @Override
-  public void checkSuperuserPrivilege() throws AccessControlException {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
     if (isPermissionEnabled) {
-      FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
+      FSPermissionChecker pc = getPermissionChecker();
+      pc.checkSuperuserPrivilege();
     }
   }
 
   /**
-   * Check whether current user have permissions to access the path.
-   * For more details of the parameters, see
-   * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
+   * Check whether current user have permissions to access the path. For more
+   * details of the parameters, see
+   * {@link FSPermissionChecker#checkPermission()}.
    */
-  private FSPermissionChecker checkPermission(String path, boolean doCheckOwner,
-      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess) throws AccessControlException, UnresolvedLinkException {
-    FSPermissionChecker pc = new FSPermissionChecker(
-        fsOwner.getShortUserName(), supergroup);
-    if (!pc.isSuper) {
+  private void checkPermission(FSPermissionChecker pc,
+      String path, boolean doCheckOwner, FsAction ancestorAccess,
+      FsAction parentAccess, FsAction access, FsAction subAccess)
+      throws AccessControlException, UnresolvedLinkException {
+    if (!pc.isSuperUser()) {
       dir.waitForReady();
       readLock();
       try {
-        pc.checkPermission(path, dir.rootDir, doCheckOwner,
-            ancestorAccess, parentAccess, access, subAccess);
+        pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess);
       } finally {
         readUnlock();
-      } 
+      }
     }
-    return pc;
   }
-
+  
   /**
    * Check to see if we have exceeded the limit on the number
    * of inodes.
@@ -5168,16 +5185,14 @@ public class FSNamesystem implements Nam
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
 	String[] cookieTab) throws IOException {
-
+    checkSuperuserPrivilege();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-
       if (!isPopulatingReplQueues()) {
         throw new IOException("Cannot run listCorruptFileBlocks because " +
                               "replication queues have not been initialized.");
       }
-      checkSuperuserPrivilege();
       // print a limited # of corrupt files per call
       int count = 0;
       ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Tue Feb 26 00:10:17 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Stack;
@@ -33,14 +34,20 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
-/** Perform permission checking in {@link FSNamesystem}. */
+/** 
+ * Class that helps in checking file system permission.
+ * The state of this class need not be synchronized as it has data structures that
+ * are read-only.
+ * 
+ * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}.
+ */
 class FSPermissionChecker {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
-
   private final UserGroupInformation ugi;
-  public final String user;
-  private final Set<String> groups = new HashSet<String>();
-  public final boolean isSuper;
+  private final String user;  
+  /** A set with group namess. Not synchronized since it is unmodifiable */
+  private final Set<String> groups;
+  private final boolean isSuper;
   
   FSPermissionChecker(String fsOwner, String supergroup
       ) throws AccessControlException{
@@ -49,10 +56,9 @@ class FSPermissionChecker {
     } catch (IOException e) {
       throw new AccessControlException(e); 
     } 
-    
-    groups.addAll(Arrays.asList(ugi.getGroupNames()));
+    HashSet<String> s = new HashSet<String>(Arrays.asList(ugi.getGroupNames()));
+    groups = Collections.unmodifiableSet(s);
     user = ugi.getShortUserName();
-    
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
   }
 
@@ -62,20 +68,23 @@ class FSPermissionChecker {
    */
   public boolean containsGroup(String group) {return groups.contains(group);}
 
+  public String getUser() {
+    return user;
+  }
+  
+  public boolean isSuperUser() {
+    return isSuper;
+  }
+  
   /**
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
-   * @param owner owner of the system
-   * @param supergroup supergroup of the system
    */
-  public static void checkSuperuserPrivilege(UserGroupInformation owner, 
-                                             String supergroup) 
-                     throws AccessControlException {
-    FSPermissionChecker checker = 
-      new FSPermissionChecker(owner.getShortUserName(), supergroup);
-    if (!checker.isSuper) {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
+    if (!isSuper) {
       throw new AccessControlException("Access denied for user " 
-          + checker.user + ". Superuser privilege is required");
+          + user + ". Superuser privilege is required");
     }
   }
   
@@ -107,6 +116,9 @@ class FSPermissionChecker {
    * If path is not a directory, there is no effect.
    * @throws AccessControlException
    * @throws UnresolvedLinkException
+   * 
+   * Guarded by {@link FSNamesystem#readLock()}
+   * Caller of this method must hold that lock.
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
@@ -152,6 +164,7 @@ class FSPermissionChecker {
       }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkOwner(INode inode, Snapshot snapshot
       ) throws AccessControlException {
     if (inode != null && user.equals(inode.getUserName(snapshot))) {
@@ -160,6 +173,7 @@ class FSPermissionChecker {
     throw new AccessControlException("Permission denied");
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkTraverse(INode[] inodes, int last, Snapshot snapshot
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
@@ -167,6 +181,7 @@ class FSPermissionChecker {
     }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkSubAccess(INode inode, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
@@ -186,11 +201,13 @@ class FSPermissionChecker {
     }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void check(INode[] inodes, int i, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     check(i >= 0? inodes[i]: null, snapshot, access);
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void check(INode inode, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     if (inode == null) {
@@ -211,6 +228,7 @@ class FSPermissionChecker {
         + ", access=" + access + ", inode=" + inode.getFullPathName());
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkStickyBit(INode parent, INode inode, Snapshot snapshot
       ) throws AccessControlException {
     if(!parent.getFsPermission(snapshot).getStickyBit()) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Tue Feb 26 00:10:17 2013
@@ -468,7 +468,11 @@ public abstract class INode implements D
 
   String getLocalParentDir() {
     INode inode = isRoot() ? this : getParent();
-    return (inode != null) ? inode.getFullPathName() : "";
+    String parentDir = "";
+    if (inode != null) {
+      parentDir = inode.getFullPathName();
+    }
+    return (parentDir != null) ? parentDir : "";
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java Tue Feb 26 00:10:17 2013
@@ -267,4 +267,11 @@ class RedundantEditLogInputStream extend
       super(msg);
     }
   }
+
+  @Override
+  public void setMaxOpSize(int maxOpSize) {
+    for (EditLogInputStream elis : streams) {
+      elis.setMaxOpSize(maxOpSize);
+    }
+  }
 }

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1448505-1449957

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1448505-1449957

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1448505-1449957

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1448505-1449957

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1448505-1449957

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Tue Feb 26 00:10:17 2013
@@ -861,6 +861,11 @@ public class TestEditLog {
     public boolean isInProgress() {
       return true;
     }
+
+    @Override
+    public void setMaxOpSize(int maxOpSize) {
+      reader.setMaxOpSize(maxOpSize);
+    }
   }
 
   @Test

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java?rev=1449976&r1=1449975&r2=1449976&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java Tue Feb 26 00:10:17 2013
@@ -83,6 +83,7 @@ public class TestNameNodeRecovery {
       elfos.close();
       elfos = null;
       elfis = new EditLogFileInputStream(TEST_LOG_NAME);
+      elfis.setMaxOpSize(elts.getMaxOpSize());
       
       // reading through normally will get you an exception
       Set<Long> validTxIds = elts.getValidTxIds();
@@ -143,7 +144,7 @@ public class TestNameNodeRecovery {
   /**
    * A test scenario for the edit log
    */
-  private interface EditLogTestSetup {
+  private static abstract class EditLogTestSetup {
     /** 
      * Set up the edit log.
      */
@@ -162,6 +163,13 @@ public class TestNameNodeRecovery {
      * edit log.
      **/
     abstract public Set<Long> getValidTxIds();
+    
+    /**
+     * Return the maximum opcode size we will use for input.
+     */
+    public int getMaxOpSize() {
+      return DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
+    }
   }
   
   static void padEditLog(EditLogOutputStream elos, int paddingLength)
@@ -182,10 +190,10 @@ public class TestNameNodeRecovery {
   }
 
   static void addDeleteOpcode(EditLogOutputStream elos,
-        OpInstanceCache cache) throws IOException {
+        OpInstanceCache cache, long txId, String path) throws IOException {
     DeleteOp op = DeleteOp.getInstance(cache);
-    op.setTransactionId(0x0);
-    op.setPath("/foo");
+    op.setTransactionId(txId);
+    op.setPath(path);
     op.setTimestamp(0);
     elos.write(op);
   }
@@ -198,7 +206,7 @@ public class TestNameNodeRecovery {
    * able to handle any amount of padding (including no padding) without
    * throwing an exception.
    */
-  private static class EltsTestEmptyLog implements EditLogTestSetup {
+  private static class EltsTestEmptyLog extends EditLogTestSetup {
     private int paddingLength;
 
     public EltsTestEmptyLog(int paddingLength) {
@@ -243,13 +251,49 @@ public class TestNameNodeRecovery {
   }
 
   /**
+   * Test using a non-default maximum opcode length.
+   */
+  private static class EltsTestNonDefaultMaxOpSize extends EditLogTestSetup {
+    public EltsTestNonDefaultMaxOpSize() {
+    }
+
+    @Override
+    public void addTransactionsToLog(EditLogOutputStream elos,
+        OpInstanceCache cache) throws IOException {
+      addDeleteOpcode(elos, cache, 0, "/foo");
+      addDeleteOpcode(elos, cache, 1,
+       "/supercalifragalisticexpialadocius.supercalifragalisticexpialadocius");
+    }
+
+    @Override
+    public long getLastValidTxId() {
+      return 0;
+    }
+
+    @Override
+    public Set<Long> getValidTxIds() {
+      return Sets.newHashSet(0L);
+    } 
+    
+    public int getMaxOpSize() {
+      return 30;
+    }
+  }
+
+  /** Test an empty edit log with extra-long padding */
+  @Test(timeout=180000)
+  public void testNonDefaultMaxOpSize() throws IOException {
+    runEditLogTest(new EltsTestNonDefaultMaxOpSize());
+  }
+
+  /**
    * Test the scenario where an edit log contains some padding (0xff) bytes
    * followed by valid opcode data.
    *
    * These edit logs are corrupt, but all the opcodes should be recoverable
    * with recovery mode.
    */
-  private static class EltsTestOpcodesAfterPadding implements EditLogTestSetup {
+  private static class EltsTestOpcodesAfterPadding extends EditLogTestSetup {
     private int paddingLength;
 
     public EltsTestOpcodesAfterPadding(int paddingLength) {
@@ -260,7 +304,7 @@ public class TestNameNodeRecovery {
     public void addTransactionsToLog(EditLogOutputStream elos,
         OpInstanceCache cache) throws IOException {
       padEditLog(elos, paddingLength);
-      addDeleteOpcode(elos, cache);
+      addDeleteOpcode(elos, cache, 0, "/foo");
     }
 
     @Override
@@ -286,7 +330,7 @@ public class TestNameNodeRecovery {
         3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
   }
 
-  private static class EltsTestGarbageInEditLog implements EditLogTestSetup {
+  private static class EltsTestGarbageInEditLog extends EditLogTestSetup {
     final private long BAD_TXID = 4;
     final private long MAX_TXID = 10;
     



Mime
View raw message