hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1449539 - in /hadoop/common/branches/branch-1: ./ src/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Sun, 24 Feb 2013 19:50:51 GMT
Author: suresh
Date: Sun Feb 24 19:50:51 2013
New Revision: 1449539

URL: http://svn.apache.org/r1449539
Log:
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when configured to use LDAP and LDAP has issues. Contributed by Xiaobo Peng and Suresh Srinivas.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1449539&r1=1449538&r2=1449539&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sun Feb 24 19:50:51 2013
@@ -171,6 +171,9 @@ Release 1.2.0 - unreleased
     HADOOP-9253. Capture ulimit info in the logs at service start time.
     (Arpit Gupta via suresh)
 
+    HDFS-4222. NN is unresponsive and loses heartbeats from DNs when 
+    configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh)
+
   OPTIMIZATIONS
 
     HDFS-2533. Backport: Remove needless synchronization on some FSDataSet

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1449539&r1=1449538&r2=1449539&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Feb 24 19:50:51 2013
@@ -189,6 +189,7 @@ public class FSNamesystem implements FSC
   private boolean isPermissionEnabled;
   private boolean persistBlocks;
   private UserGroupInformation fsOwner;
+  private String fsOwnerShortUserName;
   private String supergroup;
   private PermissionStatus defaultPermission;
   // FSNamesystemMetrics counter variables
@@ -511,6 +512,7 @@ public class FSNamesystem implements FSC
                                           throws IOException {
     fsNamesystemObject = this;
     fsOwner = UserGroupInformation.getCurrentUser();
+    this.fsOwnerShortUserName = fsOwner.getShortUserName();
     LOG.info("fsOwner=" + fsOwner);
 
     this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup");
@@ -523,7 +525,7 @@ public class FSNamesystem implements FSC
     
     short filePermission = (short)conf.getInt("dfs.upgrade.permission", 0777);
     this.defaultPermission = PermissionStatus.createImmutable(
-        fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
+        fsOwnerShortUserName, supergroup, new FsPermission(filePermission));
 
     this.blocksInvalidateWorkPct = 
               DFSUtil.getInvalidateWorkPctPerIteration(conf);
@@ -733,91 +735,90 @@ public class FSNamesystem implements FSC
   /**
    * Dump all metadata into specified file
    */
-  synchronized void metaSave(String filename) throws IOException {
+  void metaSave(String filename) throws IOException {
     checkSuperuserPrivilege();
-    File file = new File(System.getProperty("hadoop.log.dir"), 
-                         filename);
-    PrintWriter out = new PrintWriter(new BufferedWriter(
-                                                         new FileWriter(file, true)));
- 
-    long totalInodes = this.dir.totalInodes();
-    long totalBlocks = this.getBlocksTotal();
+    synchronized (this) {
+      File file = new File(System.getProperty("hadoop.log.dir"), filename);
+      PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
+          true)));
+
+      long totalInodes = this.dir.totalInodes();
+      long totalBlocks = this.getBlocksTotal();
+
+      ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+      ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+      this.DFSNodesStatus(live, dead);
+
+      String str = totalInodes + " files and directories, " + totalBlocks
+          + " blocks = " + (totalInodes + totalBlocks) + " total";
+      out.println(str);
+      out.println("Live Datanodes: " + live.size());
+      out.println("Dead Datanodes: " + dead.size());
 
-    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-    this.DFSNodesStatus(live, dead);
-    
-    String str = totalInodes + " files and directories, " + totalBlocks
-        + " blocks = " + (totalInodes + totalBlocks) + " total";
-    out.println(str);
-    out.println("Live Datanodes: "+live.size());
-    out.println("Dead Datanodes: "+dead.size());
+      //
+      // Dump contents of neededReplication
+      //
+      synchronized (neededReplications) {
+        out.println("Metasave: Blocks waiting for replication: "
+            + neededReplications.size());
+        for (Block block : neededReplications) {
+          List<DatanodeDescriptor> containingNodes = new ArrayList<DatanodeDescriptor>();
+          NumberReplicas numReplicas = new NumberReplicas();
+          // source node returned is not used
+          chooseSourceDatanode(block, containingNodes, numReplicas);
+          int usableReplicas = numReplicas.liveReplicas()
+              + numReplicas.decommissionedReplicas();
+
+          if (block instanceof BlockInfo) {
+            String fileName = FSDirectory.getFullPathName(((BlockInfo) block)
+                .getINode());
+            out.print(fileName + ": ");
+          }
 
-    //
-    // Dump contents of neededReplication
-    //
-    synchronized (neededReplications) {
-      out.println("Metasave: Blocks waiting for replication: " + 
-                  neededReplications.size());
-      for (Block block : neededReplications) {
-        List<DatanodeDescriptor> containingNodes =
-                                          new ArrayList<DatanodeDescriptor>();
-        NumberReplicas numReplicas = new NumberReplicas();
-        // source node returned is not used
-        chooseSourceDatanode(block, containingNodes, numReplicas);
-        int usableReplicas = numReplicas.liveReplicas() + 
-                             numReplicas.decommissionedReplicas(); 
-
-        if (block instanceof BlockInfo) {
-          String fileName = FSDirectory.getFullPathName(((BlockInfo) block)
-              .getINode());
-          out.print(fileName + ": ");
-        }
-
-        // l: == live:, d: == decommissioned c: == corrupt e: == excess
-        out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
-                  " (replicas:" +
-                  " l: " + numReplicas.liveReplicas() + 
-                  " d: " + numReplicas.decommissionedReplicas() + 
-                  " c: " + numReplicas.corruptReplicas() + 
-                  " e: " + numReplicas.excessReplicas() + ") ");
-
-        Collection<DatanodeDescriptor> corruptNodes =
-                                       corruptReplicas.getNodes(block);
-
-        for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
-             jt.hasNext();) {
-          DatanodeDescriptor node = jt.next();
-          String state = "";
-          if (corruptNodes != null && corruptNodes.contains(node)) {
-            state = "(corrupt)";
-          } else if (node.isDecommissioned() ||
-                     node.isDecommissionInProgress()) {
-            state = "(decommissioned)";
+          // l: == live:, d: == decommissioned c: == corrupt e: == excess
+          out.print(block + ((usableReplicas > 0) ? "" : " MISSING")
+              + " (replicas:" + " l: " + numReplicas.liveReplicas() + " d: "
+              + numReplicas.decommissionedReplicas() + " c: "
+              + numReplicas.corruptReplicas() + " e: "
+              + numReplicas.excessReplicas() + ") ");
+
+          Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
+              .getNodes(block);
+
+          for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block); jt
+              .hasNext();) {
+            DatanodeDescriptor node = jt.next();
+            String state = "";
+            if (corruptNodes != null && corruptNodes.contains(node)) {
+              state = "(corrupt)";
+            } else if (node.isDecommissioned()
+                || node.isDecommissionInProgress()) {
+              state = "(decommissioned)";
+            }
+            out.print(" " + node + state + " : ");
           }
-          out.print(" " + node + state + " : ");
+          out.println("");
         }
-        out.println("");
       }
-    }
 
-    //
-    // Dump blocks from pendingReplication
-    //
-    pendingReplications.metaSave(out);
+      //
+      // Dump blocks from pendingReplication
+      //
+      pendingReplications.metaSave(out);
 
-    //
-    // Dump blocks that are waiting to be deleted
-    //
-    dumpRecentInvalidateSets(out);
+      //
+      // Dump blocks that are waiting to be deleted
+      //
+      dumpRecentInvalidateSets(out);
 
-    //
-    // Dump all datanodes
-    //
-    datanodeDump(out);
+      //
+      // Dump all datanodes
+      //
+      datanodeDump(out);
 
-    out.flush();
-    out.close();
+      out.flush();
+      out.close();
+    }
   }
 
   /**
@@ -889,43 +890,47 @@ public class FSNamesystem implements FSC
    * @param datanode on which blocks are located
    * @param size total size of blocks
    */
-  synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
+  BlocksWithLocations getBlocks(DatanodeID datanode, long size)
       throws IOException {
     checkSuperuserPrivilege();
 
-    DatanodeDescriptor node = getDatanode(datanode);
-    if (node == null) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
-          + "Asking for blocks from an unrecorded node " + datanode.getName());
-      throw new IllegalArgumentException(
-          "Unexpected exception.  Got getBlocks message for datanode " + 
-          datanode.getName() + ", but there is no info for it");
-    }
+    synchronized (this) {
+      DatanodeDescriptor node = getDatanode(datanode);
+      if (node == null) {
+        NameNode.stateChangeLog
+            .warn("BLOCK* NameSystem.getBlocks: "
+                + "Asking for blocks from an unrecorded node "
+                + datanode.getName());
+        throw new IllegalArgumentException(
+            "Unexpected exception.  Got getBlocks message for datanode "
+                + datanode.getName() + ", but there is no info for it");
+      }
 
-    int numBlocks = node.numBlocks();
-    if(numBlocks == 0) {
-      return new BlocksWithLocations(new BlockWithLocations[0]);
-    }
-    Iterator<Block> iter = node.getBlockIterator();
-    int startBlock = r.nextInt(numBlocks); // starting from a random block
-    // skip blocks
-    for(int i=0; i<startBlock; i++) {
-      iter.next();
-    }
-    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
-    long totalSize = 0;
-    while(totalSize<size && iter.hasNext()) {
-      totalSize += addBlock(iter.next(), results);
-    }
-    if(totalSize<size) {
-      iter = node.getBlockIterator(); // start from the beginning
-      for(int i=0; i<startBlock&&totalSize<size; i++) {
+      int numBlocks = node.numBlocks();
+      if (numBlocks == 0) {
+        return new BlocksWithLocations(new BlockWithLocations[0]);
+      }
+      Iterator<Block> iter = node.getBlockIterator();
+      int startBlock = r.nextInt(numBlocks); // starting from a random block
+      // skip blocks
+      for (int i = 0; i < startBlock; i++) {
+        iter.next();
+      }
+      List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+      long totalSize = 0;
+      while (totalSize < size && iter.hasNext()) {
         totalSize += addBlock(iter.next(), results);
       }
+      if (totalSize < size) {
+        iter = node.getBlockIterator(); // start from the beginning
+        for (int i = 0; i < startBlock && totalSize < size; i++) {
+          totalSize += addBlock(iter.next(), results);
+        }
+      }
+
+      return new BlocksWithLocations(
+          results.toArray(new BlockWithLocations[results.size()]));
     }
-    
-    return new BlocksWithLocations(
-        results.toArray(new BlockWithLocations[results.size()]));
   }
   
   /**
@@ -974,10 +979,12 @@ public class FSNamesystem implements FSC
    */
   public void setPermission(String src, FsPermission permission
       ) throws IOException {
+    FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName,
+        supergroup);
     synchronized (this) {
       if (isInSafeMode())
          throw new SafeModeException("Cannot set permission for " + src, safeMode);
-      checkOwner(src);
+      checkOwner(pc, src);
       dir.setPermission(src, permission);
     }
     getEditLog().logSync();
@@ -995,12 +1002,14 @@ public class FSNamesystem implements FSC
    */
   public void setOwner(String src, String username, String group
       ) throws IOException {
+    FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName,
+        supergroup);
     synchronized (this) {
       if (isInSafeMode())
          throw new SafeModeException("Cannot set owner for " + src, safeMode);
-      FSPermissionChecker pc = checkOwner(src);
-      if (!pc.isSuper) {
-        if (username != null && !pc.user.equals(username)) {
+      checkOwner(pc, src);
+      if (!pc.isSuperUser()) {
+        if (username != null && !pc.getUser().equals(username)) {
           throw new AccessControlException("Non-super user cannot change owner");
         }
         if (group != null && !pc.containsGroup(group)) {
@@ -1075,7 +1084,8 @@ public class FSNamesystem implements FSC
       boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
       throws IOException {
     if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.READ);
+      FSPermissionChecker pc = getPermissionChecker();
+      checkPathAccess(pc, src, FsAction.READ);
     }
 
     if (offset < 0) {
@@ -1232,12 +1242,13 @@ public class FSNamesystem implements FSC
       }
     }
 
+    FSPermissionChecker pc = getPermissionChecker();
     HdfsFileStatus resultingStat = null;
     synchronized(this) {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
-      concatInternal(target, srcs);
+      concatInternal(pc, target, srcs);
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(target);
       }
@@ -1251,17 +1262,17 @@ public class FSNamesystem implements FSC
   }
 
   /** See {@link #concat(String, String[])} */
-  private void concatInternal(String target, String [] srcs) 
+  private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) 
       throws IOException {
 
     // write permission for the target
     if (isPermissionEnabled) {
-      checkPathAccess(target, FsAction.WRITE);
+      checkPathAccess(pc, target, FsAction.WRITE);
 
       // and srcs
       for(String aSrc: srcs) {
-        checkPathAccess(aSrc, FsAction.READ); // read the file
-        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+        checkPathAccess(pc, aSrc, FsAction.READ); // read the file
+        checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete 
       }
     }
 
@@ -1358,30 +1369,34 @@ public class FSNamesystem implements FSC
    * The access time is precise upto an hour. The transaction, if needed, is
    * written to the edits log but is not flushed.
    */
-  public synchronized void setTimes(String src, long mtime, long atime) throws IOException {
+  public void setTimes(String src, long mtime, long atime) throws IOException {
     if (!isAccessTimeSupported() && atime != -1) {
       throw new IOException("Access time for hdfs is not configured. " +
                             " Please set dfs.access.time.precision configuration parameter");
     }
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot set accesstimes  for " + src, safeMode);
-    }
-    //
-    // The caller needs to have write access to set access & modification times.
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
-    INodeFile inode = dir.getFileINode(src);
-    if (inode != null) {
-      dir.setTimes(src, inode, mtime, atime, true);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-        final HdfsFileStatus stat = dir.getFileInfo(src);
-        logAuditEvent(UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
-                      "setTimes", src, null, stat);
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set accesstimes  for " + src,
+            safeMode);
+      }
+      //
+      // The caller needs to have write access to set access & modification
+      // times.
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.WRITE);
+      }
+      INodeFile inode = dir.getFileINode(src);
+      if (inode != null) {
+        dir.setTimes(src, inode, mtime, atime, true);
+        if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+          final HdfsFileStatus stat = dir.getFileInfo(src);
+          logAuditEvent(UserGroupInformation.getCurrentUser(),
+              Server.getRemoteIp(), "setTimes", src, null, stat);
+        }
+      } else {
+        throw new FileNotFoundException("File " + src + " does not exist");
       }
-    } else {
-      throw new FileNotFoundException("File " + src + " does not exist");
     }
   }
 
@@ -1410,45 +1425,50 @@ public class FSNamesystem implements FSC
     return status;
   }
 
-  private synchronized boolean setReplicationInternal(String src, 
+  private boolean setReplicationInternal(String src,
                                              short replication
                                              ) throws IOException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set replication for " + src, safeMode);
-    verifyReplication(src, replication, null);
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isInSafeMode())
+        throw new SafeModeException("Cannot set replication for " + src,
+            safeMode);
+      verifyReplication(src, replication, null);
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.WRITE);
+      }
 
-    int[] oldReplication = new int[1];
-    Block[] fileBlocks;
-    fileBlocks = dir.setReplication(src, replication, oldReplication);
-    if (fileBlocks == null)  // file not found or is a directory
-      return false;
-    int oldRepl = oldReplication[0];
-    if (oldRepl == replication) // the same replication
-      return true;
+      int[] oldReplication = new int[1];
+      Block[] fileBlocks;
+      fileBlocks = dir.setReplication(src, replication, oldReplication);
+      if (fileBlocks == null) // file not found or is a directory
+        return false;
+      int oldRepl = oldReplication[0];
+      if (oldRepl == replication) // the same replication
+        return true;
 
-    // update needReplication priority queues
-    for(int idx = 0; idx < fileBlocks.length; idx++)
-      updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
-      
-    if (oldRepl > replication) {  
-      // old replication > the new one; need to remove copies
-      LOG.info("Reducing replication for " + src 
-               + ". New replication is " + replication);
+      // update needReplication priority queues
       for(int idx = 0; idx < fileBlocks.length; idx++)
-        processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
-    } else { // replication factor is increased
-      LOG.info("Increasing replication for " + src 
-          + ". New replication is " + replication);
+        updateNeededReplications(fileBlocks[idx], 0, replication - oldRepl);
+
+      if (oldRepl > replication) {
+        // old replication > the new one; need to remove copies
+        LOG.info("Reducing replication for " + src 
+            + ". New replication is " + replication);
+        for (int idx = 0; idx < fileBlocks.length; idx++)
+          processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
+      } else { // replication factor is increased
+        LOG.info("Increasing replication for " + src
+            + ". New replication is " + replication);
+      }
     }
     return true;
   }
     
   long getPreferredBlockSize(String filename) throws IOException {
     if (isPermissionEnabled) {
-      checkTraverse(filename);
+      FSPermissionChecker pc = getPermissionChecker();
+      checkTraverse(pc, filename);
     }
     return dir.getPreferredBlockSize(filename);
   }
@@ -1515,7 +1535,7 @@ public class FSNamesystem implements FSC
     }
   }
 
-  private synchronized void startFileInternal(String src,
+  private void startFileInternal(String src,
                                               PermissionStatus permissions,
                                               String holder, 
                                               String clientMachine, 
@@ -1535,103 +1555,101 @@ public class FSNamesystem implements FSC
           + ", append=" + append);
     }
 
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot create " + src, safeMode);
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid name: " + src);
-    }
-
-    // Verify that the destination does not exist as a directory already.
-    boolean pathExists = dir.exists(src);
-    if (pathExists && dir.isDir(src)) {
-      throw new IOException("Cannot create "+ src + "; already exists as a directory");
-    }
-
-    if (isPermissionEnabled) {
-      if (append || (overwrite && pathExists)) {
-        checkPathAccess(src, FsAction.WRITE);
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isInSafeMode())
+        throw new SafeModeException("Cannot create " + src, safeMode);
+      if (!DFSUtil.isValidName(src)) {
+        throw new IOException("Invalid name: " + src);
       }
-      else {
-        checkAncestorAccess(src, FsAction.WRITE);
+
+      // Verify that the destination does not exist as a directory already.
+      boolean pathExists = dir.exists(src);
+      if (pathExists && dir.isDir(src)) {
+        throw new IOException("Cannot create "+ src + "; already exists as a directory");
       }
-    }
 
-    if (!createParent) {
-      verifyParentDir(src);
-    }
+      if (isPermissionEnabled) {
+        if (append || (overwrite && pathExists)) {
+          checkPathAccess(pc, src, FsAction.WRITE);
+        } else {
+          checkAncestorAccess(pc, src, FsAction.WRITE);
+        }
+      }
 
-    try {
-      INode myFile = dir.getFileINode(src);
-      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
+      if (!createParent) {
+        verifyParentDir(src);
+      }
 
       try {
-        verifyReplication(src, replication, clientMachine);
-      } catch(IOException e) {
-        throw new IOException("failed to create "+e.getMessage());
-      }
-      if (append) {
-        if (myFile == null) {
-          throw new FileNotFoundException("failed to append to non-existent "
-              + src + " on client " + clientMachine);
-        } else if (myFile.isDirectory()) {
-          throw new IOException("failed to append to directory " + src 
-                                +" on client " + clientMachine);
-        }
-      } else if (!dir.isValidToCreate(src)) {
-        if (overwrite) {
-          delete(src, true);
-        } else {
-          throw new IOException("failed to create " + src 
-                                +" on client " + clientMachine
-                                +" either because the filename is invalid or the file exists");
+        INode myFile = dir.getFileINode(src);
+        recoverLeaseInternal(myFile, src, holder, clientMachine, false);
+
+        try {
+          verifyReplication(src, replication, clientMachine);
+        } catch (IOException e) {
+          throw new IOException("failed to create " + e.getMessage());
+        }
+        if (append) {
+          if (myFile == null) {
+            throw new FileNotFoundException("failed to append to non-existent "
+                + src + " on client " + clientMachine);
+          } else if (myFile.isDirectory()) {
+            throw new IOException("failed to append to directory " + src
+                + " on client " + clientMachine);
+          }
+        } else if (!dir.isValidToCreate(src)) {
+          if (overwrite) {
+            delete(src, true);
+          } else {
+            throw new IOException("failed to create file " + src
+                + " on client " + clientMachine
+                + " either because the filename is invalid or the file exists");
+          }
         }
-      }
 
-      DatanodeDescriptor clientNode = 
-        host2DataNodeMap.getDatanodeByHost(clientMachine);
+        DatanodeDescriptor clientNode = host2DataNodeMap
+            .getDatanodeByHost(clientMachine);
 
-      if (append) {
-        //
-        // Replace current node with a INodeUnderConstruction.
-        // Recreate in-memory lease record.
-        //
-        INodeFile node = (INodeFile) myFile;
-        INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                        node.getLocalNameBytes(),
-                                        node.getReplication(),
-                                        node.getModificationTime(),
-                                        node.getPreferredBlockSize(),
-                                        node.getBlocks(),
-                                        node.getPermissionStatus(),
-                                        holder,
-                                        clientMachine,
-                                        clientNode);
-        dir.replaceNode(src, node, cons);
-        leaseManager.addLease(cons.clientName, src);
+        if (append) {
+          //
+          // Replace current node with a INodeUnderConstruction.
+          // Recreate in-memory lease record.
+          //
+          INodeFile node = (INodeFile) myFile;
+          INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+              node.getLocalNameBytes(), node.getReplication(),
+              node.getModificationTime(), node.getPreferredBlockSize(),
+              node.getBlocks(), node.getPermissionStatus(), holder,
+              clientMachine, clientNode);
+          dir.replaceNode(src, node, cons);
+          leaseManager.addLease(cons.clientName, src);
 
-      } else {
-       // Now we can add the name to the filesystem. This file has no
-       // blocks associated with it.
-       //
-       checkFsObjectLimit();
-
-        // increment global generation stamp
-        long genstamp = nextGenerationStamp();
-        INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
-            replication, blockSize, holder, clientMachine, clientNode, genstamp);
-        if (newNode == null) {
-          throw new IOException("DIR* startFile: Unable to add to namespace");
-        }
-        leaseManager.addLease(newNode.clientName, src);
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("DIR* startFile: "
-                                     +"add "+src+" to namespace for "+holder);
-        }
-      }
-    } catch (IOException ie) {
-      NameNode.stateChangeLog.warn("DIR* startFile: "
-                                   +ie.getMessage());
-      throw ie;
+        } else {
+          // Now we can add the name to the filesystem. This file has no
+          // blocks associated with it.
+          //
+          checkFsObjectLimit();
+
+          // increment global generation stamp
+          long genstamp = nextGenerationStamp();
+          INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
+              replication, blockSize, holder, clientMachine, clientNode,
+              genstamp);
+          if (newNode == null) {
+            throw new IOException("DIR* startFile: Unable to add to namespace");
+          }
+          leaseManager.addLease(newNode.clientName, src);
+          if (NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog.debug("DIR* startFile: "
+                                       +"add "+src+" to namespace for "+holder);
+          }
+        }
+      } catch (IOException ie) {
+        NameNode.stateChangeLog.warn("DIR* startFile: "
+                                     +ie.getMessage());
+        throw ie;
+      }
     }
   }
 
@@ -1646,29 +1664,32 @@ public class FSNamesystem implements FSC
    * @return true if the file is already closed
    * @throws IOException
    */
-  synchronized boolean recoverLease(String src, String holder, String clientMachine)
+  boolean recoverLease(String src, String holder, String clientMachine)
   throws IOException {
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot recover the lease of " + src, safeMode);
-    }
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid name: " + src);
-    }
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot recover the lease of " + src,
+            safeMode);
+      }
+      if (!DFSUtil.isValidName(src)) {
+        throw new IOException("Invalid name: " + src);
+      }
 
-    INode inode = dir.getFileINode(src);
-    if (inode == null) {
-      throw new FileNotFoundException("File not found " + src);
-    }
+      INode inode = dir.getFileINode(src);
+      if (inode == null) {
+        throw new FileNotFoundException("File not found " + src);
+      }
 
-    if (!inode.isUnderConstruction()) {
-      return true;
-    }
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
+      if (!inode.isUnderConstruction()) {
+        return true;
+      }
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.WRITE);
+      }
 
-    recoverLeaseInternal(inode, src, holder, clientMachine, true);
+      recoverLeaseInternal(inode, src, holder, clientMachine, true);
+    }
     return false;
   }
   
@@ -2287,28 +2308,31 @@ public class FSNamesystem implements FSC
     return status;
   }
 
-  private synchronized boolean renameToInternal(String src, String dst
+  private boolean renameToInternal(String src, String dst
       ) throws IOException {
     NameNode.stateChangeLog.debug("DIR* renameTo: " + src + " to " + dst);
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot rename " + src, safeMode);
-    if (!DFSUtil.isValidName(dst)) {
-      throw new IOException("Invalid name: " + dst);
-    }
-
-    if (isPermissionEnabled) {
-      //We should not be doing this.  This is move() not renameTo().
-      //but for now,
-      String actualdst = dir.isDir(dst)?
-          dst + Path.SEPARATOR + new Path(src).getName(): dst;
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(actualdst, FsAction.WRITE);
-    }
-
-    HdfsFileStatus dinfo = dir.getFileInfo(dst);
-    if (dir.renameTo(src, dst)) {
-      changeLease(src, dst, dinfo);     // update lease with new filename
-      return true;
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isInSafeMode())
+        throw new SafeModeException("Cannot rename " + src, safeMode);
+      if (!DFSUtil.isValidName(dst)) {
+        throw new IOException("Invalid name: " + dst);
+      }
+  
+      if (isPermissionEnabled) {
+        //We should not be doing this.  This is move() not renameTo().
+        //but for now,
+        String actualdst = dir.isDir(dst)?
+            dst + Path.SEPARATOR + new Path(src).getName(): dst;
+        checkParentAccess(pc, src, FsAction.WRITE);
+        checkAncestorAccess(pc, actualdst, FsAction.WRITE);
+      }
+  
+      HdfsFileStatus dinfo = dir.getFileInfo(dst);
+      if (dir.renameTo(src, dst)) {
+        changeLease(src, dst, dinfo);     // update lease with new filename
+        return true;
+      }
     }
     return false;
   }
@@ -2344,13 +2368,15 @@ public class FSNamesystem implements FSC
    */
   private boolean deleteInternal(String src,
       boolean enforcePermission) throws IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
     synchronized (this) {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot delete " + src, safeMode);
       }
       if (enforcePermission && isPermissionEnabled) {
-        checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+        checkPermission(pc, src, false, null, FsAction.WRITE, null,
+            FsAction.ALL);
       }
       // Unlink the target directory from directory tree
       if (!dir.delete(src, collectedBlocks)) {
@@ -2407,10 +2433,13 @@ public class FSNamesystem implements FSC
    *         or null if file not found
    */
   HdfsFileStatus getFileInfo(String src) throws IOException {
-    if (isPermissionEnabled) {
-      checkTraverse(src);
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isPermissionEnabled) {
+        checkTraverse(pc, src);
+      }
+      return dir.getFileInfo(src);
     }
-    return dir.getFileInfo(src);
   }
 
   /**
@@ -2432,42 +2461,48 @@ public class FSNamesystem implements FSC
   /**
    * Create all the necessary directories
    */
-  private synchronized boolean mkdirsInternal(String src,
+  private boolean mkdirsInternal(String src,
       PermissionStatus permissions) throws IOException {
     NameNode.stateChangeLog.debug("DIR* mkdirs: " + src);
-    if (isPermissionEnabled) {
-      checkTraverse(src);
-    }
-    if (dir.isDir(src)) {
-      // all the users of mkdirs() are used to expect 'true' even if
-      // a new directory is not created.
-      return true;
-    }
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot create directory " + src, safeMode);
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid directory name: " + src);
-    }
-    if (isPermissionEnabled) {
-      checkAncestorAccess(src, FsAction.WRITE);
-    }
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isPermissionEnabled) {
+        checkTraverse(pc, src);
+      }
+      if (dir.isDir(src)) {
+        // all the users of mkdirs() are used to expect 'true' even if
+        // a new directory is not created.
+        return true;
+      }
+      if (isInSafeMode())
+        throw new SafeModeException("Cannot create directory " + src, safeMode);
+      if (!DFSUtil.isValidName(src)) {
+        throw new IOException("Invalid directory name: " + src);
+      }
+      if (isPermissionEnabled) {
+        checkAncestorAccess(pc, src, FsAction.WRITE);
+      }
 
-    // validate that we have enough inodes. This is, at best, a 
-    // heuristic because the mkdirs() operation migth need to 
-    // create multiple inodes.
-    checkFsObjectLimit();
+      // validate that we have enough inodes. This is, at best, a
+      // heuristic because the mkdirs() operation migth need to
+      // create multiple inodes.
+      checkFsObjectLimit();
 
-    if (!dir.mkdirs(src, permissions, false, now())) {
-      throw new IOException("Invalid directory name: " + src);
+      if (!dir.mkdirs(src, permissions, false, now())) {
+        throw new IOException("Invalid directory name: " + src);
+      }
+      return true;
     }
-    return true;
   }
 
   ContentSummary getContentSummary(String src) throws IOException {
-    if (isPermissionEnabled) {
-      checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isPermissionEnabled) {
+        checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
+      }
+      return dir.getContentSummary(src);
     }
-    return dir.getContentSummary(src);
   }
 
   /**
@@ -2476,12 +2511,10 @@ public class FSNamesystem implements FSC
    * contract.
    */
   void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
-   synchronized (this) {
-     if (isInSafeMode())
+    checkSuperuserPrivilege();
+    synchronized (this) {
+      if (isInSafeMode())
         throw new SafeModeException("Cannot set quota on " + path, safeMode); 
-     if (isPermissionEnabled) {
-        checkSuperuserPrivilege();
-      }
     
       dir.setQuota(path, nsQuota, dsQuota);
     }
@@ -2724,20 +2757,21 @@ public class FSNamesystem implements FSC
    */
   public DirectoryListing getListing(String src, byte[] startAfter)
   throws IOException {
-    if (isPermissionEnabled) {
-      if (dir.isDir(src)) {
-        checkPathAccess(src, FsAction.READ_EXECUTE);
+    FSPermissionChecker pc = getPermissionChecker();
+    synchronized (this) {
+      if (isPermissionEnabled) {
+        if (dir.isDir(src)) {
+          checkPathAccess(pc, src, FsAction.READ_EXECUTE);
+        } else {
+          checkTraverse(pc, src);
+        }
       }
-      else {
-        checkTraverse(src);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(UserGroupInformation.getCurrentUser(),
+            Server.getRemoteIp(), "listStatus", src, null, null);
       }
+      return dir.getListing(src, startAfter);
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
-                    "listStatus", src, null, null);
-    }
-    return dir.getListing(src, startAfter);
   }
 
   /////////////////////////////////////////////////////////
@@ -4575,16 +4609,18 @@ public class FSNamesystem implements FSC
     return nodes;
   }
 
-  public synchronized DatanodeInfo[] datanodeReport( DatanodeReportType type
+  public DatanodeInfo[] datanodeReport(DatanodeReportType type
       ) throws AccessControlException {
     checkSuperuserPrivilege();
 
-    ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
-    DatanodeInfo[] arr = new DatanodeInfo[results.size()];
-    for (int i=0; i<arr.length; i++) {
-      arr[i] = new DatanodeInfo(results.get(i));
+    synchronized (this) {
+      ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+      DatanodeInfo[] arr = new DatanodeInfo[results.size()];
+      for (int i = 0; i < arr.length; i++) {
+        arr[i] = new DatanodeInfo(results.get(i));
+      }
+      return arr;
     }
-    return arr;
   }
 
   /**
@@ -4595,14 +4631,16 @@ public class FSNamesystem implements FSC
    * @throws AccessControlException if superuser privilege is violated.
    * @throws IOException if 
    */
-  synchronized void saveNamespace() throws AccessControlException, IOException {
+  void saveNamespace() throws AccessControlException, IOException {
     checkSuperuserPrivilege();
-    if(!isInSafeMode()) {
-      throw new IOException("Safe mode should be turned ON " +
-                            "in order to create namespace image");
+    synchronized (this) {
+      if(!isInSafeMode()) {
+        throw new IOException("Safe mode should be turned ON " +
+                              "in order to create namespace image");
+      }
+      getFSImage().saveNamespace(true);
+      LOG.info("New namespace image has been created");
     }
-    getFSImage().saveNamespace(true);
-    LOG.info("New namespace image has been created");
   }
 
   /**
@@ -4930,7 +4968,9 @@ public class FSNamesystem implements FSC
     
   void finalizeUpgrade() throws IOException {
     checkSuperuserPrivilege();
-    getFSImage().finalizeUpgrade();
+    synchronized (this) {
+      getFSImage().finalizeUpgrade();
+    }
   }
 
   /**
@@ -5572,12 +5612,14 @@ public class FSNamesystem implements FSC
   }
 
   synchronized CheckpointSignature rollEditLog() throws IOException {
-    if (isInSafeMode()) {
-      throw new SafeModeException("Checkpoint not created",
-                                  safeMode);
+    checkSuperuserPrivilege();
+    synchronized (this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Log not rolled", safeMode);
+      }
+      LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
+      return getFSImage().rollEditLog();
     }
-    LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
-    return getFSImage().rollEditLog();
   }
 
   synchronized void rollFSImage() throws IOException {
@@ -5632,33 +5674,48 @@ public class FSNamesystem implements FSC
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
 
-  private FSPermissionChecker checkOwner(String path) throws AccessControlException {
-    return checkPermission(path, true, null, null, null, null);
+  /*
+   * Caller of this method should NOT hold FSNamesystem lock. Otherwise, if
+   * FSPermissionChecker instantiation hangs, NN will hang also.
+   */
+  private FSPermissionChecker getPermissionChecker()
+      throws AccessControlException {
+    return (isPermissionEnabled) ? new FSPermissionChecker(
+        fsOwnerShortUserName, supergroup) : null;
+  }
+
+  private void checkOwner(FSPermissionChecker pc, String path)
+      throws AccessControlException {
+    checkPermission(pc, path, true, null, null, null, null);
   }
 
-  private FSPermissionChecker checkPathAccess(String path, FsAction access
+  private void checkPathAccess(FSPermissionChecker pc,
+      String path, FsAction access
       ) throws AccessControlException {
-    return checkPermission(path, false, null, null, access, null);
+    checkPermission(pc, path, false, null, null, access, null);
   }
 
-  private FSPermissionChecker checkParentAccess(String path, FsAction access
+  private void checkParentAccess(FSPermissionChecker pc,
+      String path, FsAction access
       ) throws AccessControlException {
-    return checkPermission(path, false, null, access, null, null);
+    checkPermission(pc, path, false, null, access, null, null);
   }
 
-  private FSPermissionChecker checkAncestorAccess(String path, FsAction access
+  private void checkAncestorAccess(FSPermissionChecker pc,
+      String path, FsAction access
       ) throws AccessControlException {
-    return checkPermission(path, false, access, null, null, null);
+    checkPermission(pc, path, false, access, null, null, null);
   }
 
-  private FSPermissionChecker checkTraverse(String path
+  private void checkTraverse(FSPermissionChecker pc, String path
       ) throws AccessControlException {
-    return checkPermission(path, false, null, null, null, null);
+    checkPermission(pc, path, false, null, null, null, null);
   }
 
   private void checkSuperuserPrivilege() throws AccessControlException {
     if (isPermissionEnabled) {
-      PermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
+      FSPermissionChecker pc = getPermissionChecker();
+      pc.checkSuperuserPrivilege();
     }
   }
 
@@ -5667,17 +5724,17 @@ public class FSNamesystem implements FSC
    * For more details of the parameters, see
    * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
    */
-  private FSPermissionChecker checkPermission(String path, boolean doCheckOwner,
+  private void checkPermission(FSPermissionChecker pc,
+      String path, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
       FsAction subAccess) throws AccessControlException {
-    FSPermissionChecker pc = new FSPermissionChecker(
-        fsOwner.getShortUserName(), supergroup);
-    if (!pc.isSuper) {
+    if (!pc.isSuperUser()) {
       dir.waitForReady();
-      pc.checkPermission(path, dir.rootDir, doCheckOwner,
-          ancestorAccess, parentAccess, access, subAccess);
+      synchronized (this) {
+        pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess);
+      }
     }
-    return pc;
   }
 
   /**

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1449539&r1=1449538&r2=1449539&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Sun Feb 24 19:50:51 2013
@@ -19,25 +19,33 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
-/** Perform permission checking in {@link FSNamesystem}. */
+/** 
+ * Class that helps in checking file system permission.
+ * The state of this class need not be synchronized as it has data structures that
+ * are read-only.
+ * 
+ * Some of the helper methods are guarded by {@link FSNamesystem} intrinsic lock.
+ */
 class FSPermissionChecker {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
 
-private final UserGroupInformation ugi;
-  public final String user;
-  private final Set<String> groups = new HashSet<String>();
-  public final boolean isSuper;
+  private final UserGroupInformation ugi;
+  private final String user;
+  /** A set with group namess. Not synchronized since it is unmodifiable */
+  private final Set<String> groups;
+  private final boolean isSuper;
   
   FSPermissionChecker(String fsOwner, String supergroup
       ) throws AccessControlException{
@@ -47,9 +55,9 @@ private final UserGroupInformation ugi;
       throw new AccessControlException(e); 
     } 
 
-    groups.addAll(Arrays.asList(ugi.getGroupNames()));
+    HashSet<String> s = new HashSet<String>(Arrays.asList(ugi.getGroupNames()));
+    groups = Collections.unmodifiableSet(s);
     user = ugi.getShortUserName();
-
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
   }
 
@@ -59,20 +67,23 @@ private final UserGroupInformation ugi;
    */
   public boolean containsGroup(String group) {return groups.contains(group);}
 
+  public String getUser() {
+    return user;
+  }
+
+  public boolean isSuperUser() {
+    return isSuper;
+  }
+
   /**
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
-   * @param owner owner of the system
-   * @param supergroup supergroup of the system
    */
-  public static void checkSuperuserPrivilege(UserGroupInformation owner, 
-                                             String supergroup) 
-                     throws AccessControlException {
-    FSPermissionChecker checker = 
-      new FSPermissionChecker(owner.getShortUserName(), supergroup);
-    if (!checker.isSuper) {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
+    if (!isSuper) {
       throw new AccessControlException("Access denied for user " 
-          + checker.user + ". Superuser privilege is required");
+          + user + ". Superuser privilege is required");
     }
   }
 
@@ -102,8 +113,10 @@ private final UserGroupInformation ugi;
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
-   * @return a PermissionChecker object which caches data for later use.
    * @throws AccessControlException
+   * 
+   * Guarded by {@link FSNamesystem} intrinsic lock
+   * Caller of this method must hold that lock.
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
@@ -142,6 +155,7 @@ private final UserGroupInformation ugi;
     }
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void checkOwner(INode inode) throws AccessControlException {
     if (inode != null && user.equals(inode.getUserName())) {
       return;
@@ -149,6 +163,7 @@ private final UserGroupInformation ugi;
     throw new AccessControlException("Permission denied");
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void checkTraverse(INode[] inodes, int last
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
@@ -156,6 +171,7 @@ private final UserGroupInformation ugi;
     }
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void checkSubAccess(INode inode, FsAction access
       ) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
@@ -175,11 +191,13 @@ private final UserGroupInformation ugi;
     }
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void check(INode[] inodes, int i, FsAction access
       ) throws AccessControlException {
     check(i >= 0? inodes[i]: null, access);
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void check(INode inode, FsAction access
       ) throws AccessControlException {
     if (inode == null) {

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java?rev=1449539&r1=1449538&r2=1449539&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java Sun Feb 24 19:50:51 2013
@@ -98,7 +98,6 @@ class PermissionChecker {
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
-   * @return a PermissionChecker object which caches data for later use.
    * @throws AccessControlException
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,



Mime
View raw message