hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r997940 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/
Date Thu, 16 Sep 2010 22:26:13 GMT
Author: dhruba
Date: Thu Sep 16 22:26:12 2010
New Revision: 997940

URL: http://svn.apache.org/viewvc?rev=997940&view=rev
Log:
HDFS-1093. Change the FSNamesystem lock to a read/write lock. (dhruba)


Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=997940&r1=997939&r2=997940&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu Sep 16 22:26:12 2010
@@ -131,6 +131,8 @@ Trunk (unreleased changes)
 
     HDFS-1383. Improve the error messages when using hftp://.  (szetszwo)
 
+    HDFS-1093. Change the FSNamesystem lock to a read/write lock. (dhruba)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java?rev=997940&r1=997939&r2=997940&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java Thu Sep 16 22:26:12 2010
@@ -153,8 +153,11 @@ public class BackupStorage extends FSIma
         throw new IOException("Could not locate checkpoint directories");
       StorageDirectory sdName = itImage.next();
       StorageDirectory sdEdits = itEdits.next();
-      synchronized(getFSDirectoryRootLock()) { // load image under rootDir lock
+      getFSDirectoryRootLock().writeLock();
+      try { // load image under rootDir lock
         loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+      } finally {
+        getFSDirectoryRootLock().writeUnlock();
       }
       loadFSEdits(sdEdits);
     }
@@ -172,8 +175,8 @@ public class BackupStorage extends FSIma
     saveNamespace(false);
   }
 
-  private Object getFSDirectoryRootLock() {
-    return getFSNamesystem().dir.rootDir;
+  private FSDirectory getFSDirectoryRootLock() {
+    return getFSNamesystem().dir;
   }
 
   static File getJSpoolDir(StorageDirectory sd) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=997940&r1=997939&r2=997940&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Thu Sep 16 22:26:12 2010
@@ -745,7 +745,8 @@ public class BlockManager {
     for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) {
       blocksToReplicate.add(new ArrayList<Block>());
     }
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       synchronized (neededReplications) {
         if (neededReplications.size() == 0) {
           missingBlocksInCurIter = 0;
@@ -788,7 +789,9 @@ public class BlockManager {
           }
         } // end for
       } // end synchronized neededReplication
-    } // end synchronized namesystem
+    } finally {
+      namesystem.writeUnlock();
+    }
 
     return blocksToReplicate;
   }
@@ -806,7 +809,8 @@ public class BlockManager {
     INodeFile fileINode = null;
     int additionalReplRequired;
 
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       synchronized (neededReplications) {
         // block should belong to a file
         fileINode = blocksMap.getINode(block);
@@ -853,6 +857,8 @@ public class BlockManager {
         }
 
       }
+    } finally {
+      namesystem.writeUnlock();
     }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
@@ -864,7 +870,8 @@ public class BlockManager {
     if(targets.length == 0)
       return false;
 
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       synchronized (neededReplications) {
         // Recheck since global lock was released
         // block should belong to a file
@@ -941,6 +948,8 @@ public class BlockManager {
           }
         }
       }
+    } finally {
+      namesystem.writeUnlock();
     }
 
     return true;
@@ -1022,7 +1031,8 @@ public class BlockManager {
   void processPendingReplications() {
     Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
     if (timedOutItems != null) {
-      synchronized (namesystem) {
+      namesystem.writeLock();
+      try {
         for (int i = 0; i < timedOutItems.length; i++) {
           NumberReplicas num = countNodes(timedOutItems[i]);
           if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
@@ -1033,6 +1043,8 @@ public class BlockManager {
                                    getReplication(timedOutItems[i]));
           }
         }
+      } finally {
+        namesystem.writeUnlock();
       }
       /* If we know the target datanodes where the replication timedout,
        * we could invoke decBlocksScheduled() on it. Its ok for now.
@@ -1082,6 +1094,7 @@ public class BlockManager {
                                DatanodeDescriptor node,
                                DatanodeDescriptor delNodeHint)
   throws IOException {
+    assert (namesystem.hasWriteLock());
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
@@ -1212,7 +1225,8 @@ public class BlockManager {
    */
   void processMisReplicatedBlocks() {
     long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       neededReplications.clear();
       for (BlockInfo block : blocksMap.getBlocks()) {
         INodeFile fileINode = block.getINode();
@@ -1240,6 +1254,8 @@ public class BlockManager {
           processOverReplicatedBlock(block, expectedReplication, null, null);
         }
       }
+    } finally {
+      namesystem.writeUnlock();
     }
     FSNamesystem.LOG.info("Total number of blocks = " + blocksMap.size());
     FSNamesystem.LOG.info("Number of invalid blocks = " + nrInvalid);
@@ -1303,7 +1319,8 @@ public class BlockManager {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
           + block + " from " + node.getName());
     }
-    synchronized (namesystem) {
+    assert (namesystem.hasWriteLock());
+    {
       if (!blocksMap.removeNode(block, node)) {
         if(NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
@@ -1535,7 +1552,8 @@ public class BlockManager {
   /* updates a block in under replication queue */
   void updateNeededReplications(Block block, int curReplicasDelta,
       int expectedReplicasDelta) {
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       NumberReplicas repl = countNodes(block);
       int curExpectedReplicas = getReplication(block);
       if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
@@ -1548,6 +1566,8 @@ public class BlockManager {
         neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
                                   oldExpectedReplicas);
       }
+    } finally {
+      namesystem.writeUnlock();
     }
   }
 
@@ -1590,7 +1610,8 @@ public class BlockManager {
    * @return number of blocks scheduled for removal during this iteration.
    */
   private int invalidateWorkForOneNode(String nodeId) {
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       // blocks should not be replicated or removed if safe mode is on
       if (namesystem.isInSafeMode())
         return 0;
@@ -1635,6 +1656,8 @@ public class BlockManager {
       }
       pendingDeletionBlocksCount -= blocksToInvalidate.size();
       return blocksToInvalidate.size();
+    } finally {
+      namesystem.writeUnlock();
     }
   }
   
@@ -1725,8 +1748,11 @@ public class BlockManager {
   }
 
   int getCapacity() {
-    synchronized(namesystem) {
+    namesystem.readLock();
+    try {
       return blocksMap.getCapacity();
+    } finally {
+      namesystem.readUnlock();
     }
   }
   

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java?rev=997940&r1=997939&r2=997940&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java Thu Sep 16 22:26:12 2010
@@ -57,8 +57,11 @@ class DecommissionManager {
      */
     public void run() {
       for(; fsnamesystem.isRunning(); ) {
-        synchronized(fsnamesystem) {
+        fsnamesystem.writeLock();
+        try {
           check();
+        } finally {
+          fsnamesystem.writeUnlock();
         }
   
         try {
@@ -90,4 +93,4 @@ class DecommissionManager {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=997940&r1=997939&r2=997940&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Sep 16 22:26:12 2010
@@ -24,6 +24,9 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -66,7 +69,32 @@ class FSDirectory implements Closeable {
   private volatile boolean ready = false;
   private static final long UNKNOWN_DISK_SPACE = -1;
   private final int lsLimit;  // max list limit
-  
+
+  // lock to protect BlockMap.
+  private ReentrantReadWriteLock bLock;
+  private Condition cond;
+
+  // utility methods to acquire and release read lock and write lock
+  void readLock() {
+    this.bLock.readLock().lock();
+  }
+
+  void readUnlock() {
+    this.bLock.readLock().unlock();
+  }
+
+  void writeLock() {
+    this.bLock.writeLock().lock();
+  }
+
+  void writeUnlock() {
+    this.bLock.writeLock().unlock();
+  }
+
+  boolean hasWriteLock() {
+    return this.bLock.isWriteLockedByCurrentThread();
+  }
+
   /**
    * Caches frequently used file names used in {@link INode} to reuse 
    * byte[] objects and reduce heap usage.
@@ -87,6 +115,8 @@ class FSDirectory implements Closeable {
   }
 
   FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
+    this.bLock = new ReentrantReadWriteLock(true); // fair
+    this.cond = bLock.writeLock().newCondition();
     fsImage.setFSNamesystem(ns);
     rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
@@ -134,10 +164,13 @@ class FSDirectory implements Closeable {
       fsImage.close();
       throw e;
     }
-    synchronized (this) {
+    writeLock();
+    try {
       this.ready = true;
       this.nameCache.initialized();
-      this.notifyAll();
+      cond.signalAll();
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -158,13 +191,16 @@ class FSDirectory implements Closeable {
    */
   void waitForReady() {
     if (!ready) {
-      synchronized (this) {
+      writeLock();
+      try {
         while (!ready) {
           try {
-            this.wait(5000);
+            cond.await(5000, TimeUnit.MILLISECONDS);
           } catch (InterruptedException ie) {
           }
         }
+      } finally {
+        writeUnlock();
       }
     }
   }
@@ -196,8 +232,11 @@ class FSDirectory implements Closeable {
                                  permissions,replication,
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
-    synchronized (rootDir) {
+    writeLock();
+    try {
       newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
+    } finally {
+      writeUnlock();
     }
     if (newNode == null) {
       NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
@@ -234,7 +273,8 @@ class FSDirectory implements Closeable {
                               modificationTime, atime, preferredBlockSize);
       diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
     }
-    synchronized (rootDir) {
+    writeLock();
+    try {
       try {
         newNode = addNode(path, newNode, diskspace, false);
         if(newNode != null && blocks != null) {
@@ -249,7 +289,10 @@ class FSDirectory implements Closeable {
         return null;
       }
       return newNode;
+    } finally {
+      writeUnlock();
     }
+
   }
 
   INodeDirectory addToParent( byte[][] src,
@@ -286,7 +329,8 @@ class FSDirectory implements Closeable {
     }
     // add new node to the parent
     INodeDirectory newParent = null;
-    synchronized (rootDir) {
+    writeLock();
+    try {
       try {
         newParent = rootDir.addToParent(src, newNode, parentINode,
                                         false, propagateModTime);
@@ -306,6 +350,8 @@ class FSDirectory implements Closeable {
           newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
         }
       }
+    } finally {
+      writeUnlock();
     }
     return newParent;
   }
@@ -320,7 +366,8 @@ class FSDirectory implements Closeable {
   ) throws QuotaExceededException {
     waitForReady();
 
-    synchronized (rootDir) {
+    writeLock();
+    try {
       assert inodes[inodes.length-1].isUnderConstruction() :
         "INode should correspond to a file under construction";
       INodeFileUnderConstruction fileINode = 
@@ -347,6 +394,8 @@ class FSDirectory implements Closeable {
             + "file system");
       }
       return blockInfo;
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -356,13 +405,16 @@ class FSDirectory implements Closeable {
   void persistBlocks(String path, INodeFileUnderConstruction file) {
     waitForReady();
 
-    synchronized (rootDir) {
+    writeLock();
+    try {
       fsImage.getEditLog().logOpenFile(path, file);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
             +path+" with "+ file.getBlocks().length 
             +" blocks is persisted to the file system");
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -372,7 +424,8 @@ class FSDirectory implements Closeable {
   void closeFile(String path, INodeFile file) {
     waitForReady();
     long now = now();
-    synchronized (rootDir) {
+    writeLock();
+    try {
       // file is closed
       file.setModificationTimeForce(now);
       fsImage.getEditLog().logCloseFile(path, file);
@@ -381,6 +434,8 @@ class FSDirectory implements Closeable {
             +path+" with "+ file.getBlocks().length 
             +" blocks is persisted to the file system");
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -391,7 +446,8 @@ class FSDirectory implements Closeable {
                       Block block) throws IOException {
     waitForReady();
 
-    synchronized (rootDir) {
+    writeLock();
+    try {
       // modify file-> block and blocksMap
       fileNode.removeLastBlock(block);
       getBlockManager().removeBlockFromMap(block);
@@ -405,6 +461,8 @@ class FSDirectory implements Closeable {
             +path+" with "+block
             +" block is added to the file system");
       }
+    } finally {
+      writeUnlock();
     }
     return true;
   }
@@ -462,7 +520,8 @@ class FSDirectory implements Closeable {
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException {
-    synchronized (rootDir) {
+    writeLock();
+    try {
       INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
       INode srcInode = srcInodes[srcInodes.length-1];
       
@@ -560,6 +619,8 @@ class FSDirectory implements Closeable {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           +"failed to rename "+src+" to "+dst);
       return false;
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -586,7 +647,8 @@ class FSDirectory implements Closeable {
       }
     }
     String error = null;
-    synchronized (rootDir) {
+    writeLock();
+    try {
       final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
       final INode srcInode = srcInodes[srcInodes.length - 1];
       // validate source
@@ -733,6 +795,8 @@ class FSDirectory implements Closeable {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + "failed to rename " + src + " to " + dst);
       throw new IOException("rename from " + src + " to " + dst + " failed.");
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -764,7 +828,9 @@ class FSDirectory implements Closeable {
     }
     oldReplication[0] = -1;
     Block[] fileBlocks = null;
-    synchronized(rootDir) {
+
+    writeLock();
+    try {
       INode[] inodes = rootDir.getExistingPathINodes(src, true);
       INode inode = inodes[inodes.length - 1];
       if (inode == null) {
@@ -784,6 +850,8 @@ class FSDirectory implements Closeable {
 
       fileNode.setReplication(replication);
       fileBlocks = fileNode.getBlocks();
+    } finally {
+      writeUnlock();
     }
     return fileBlocks;
   }
@@ -795,7 +863,8 @@ class FSDirectory implements Closeable {
    */
   long getPreferredBlockSize(String filename) throws UnresolvedLinkException,
       FileNotFoundException, IOException {
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode inode = rootDir.getNode(filename, false);
       if (inode == null) {
         throw new FileNotFoundException("File does not exist: " + filename);
@@ -804,12 +873,15 @@ class FSDirectory implements Closeable {
         throw new IOException("Getting block size of non-file: "+ filename); 
       }
       return ((INodeFile)inode).getPreferredBlockSize();
+    } finally {
+      readUnlock();
     }
   }
 
   boolean exists(String src) throws UnresolvedLinkException {
     src = normalizePath(src);
-    synchronized(rootDir) {
+    readLock();
+    try {
       INode inode = rootDir.getNode(src, false);
       if (inode == null) {
          return false;
@@ -817,6 +889,8 @@ class FSDirectory implements Closeable {
       return inode.isDirectory() || inode.isLink() 
         ? true 
         : ((INodeFile)inode).getBlocks() != null;
+    } finally {
+      readUnlock();
     }
   }
 
@@ -828,12 +902,15 @@ class FSDirectory implements Closeable {
 
   void unprotectedSetPermission(String src, FsPermission permissions) 
     throws FileNotFoundException, UnresolvedLinkException {
-    synchronized(rootDir) {
+    writeLock();
+    try {
         INode inode = rootDir.getNode(src, true);
         if (inode == null) {
             throw new FileNotFoundException("File does not exist: " + src);
         }
         inode.setPermission(permissions);
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -845,7 +922,8 @@ class FSDirectory implements Closeable {
 
   void unprotectedSetOwner(String src, String username, String groupname) 
     throws FileNotFoundException, UnresolvedLinkException {
-    synchronized(rootDir) {
+    writeLock();
+    try {
       INode inode = rootDir.getNode(src, true);
       if (inode == null) {
           throw new FileNotFoundException("File does not exist: " + src);
@@ -856,6 +934,8 @@ class FSDirectory implements Closeable {
       if (groupname != null) {
         inode.setGroup(groupname);
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -864,13 +944,16 @@ class FSDirectory implements Closeable {
    */
   public void concatInternal(String target, String [] srcs) 
       throws UnresolvedLinkException {
-    synchronized(rootDir) {
+    writeLock();
+    try {
       // actual move
       waitForReady();
 
       unprotectedConcat(target, srcs);
       // do the commit
       fsImage.getEditLog().logConcat(target, srcs, now());
+    } finally {
+      writeUnlock();
     }
   }
   
@@ -952,12 +1035,15 @@ class FSDirectory implements Closeable {
     if (!isDir(src)) {
       return true;
     }
-    synchronized(rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(src, false);
       assert targetNode != null : "should be taken care in isDir() above";
       if (((INodeDirectory)targetNode).getChildren().size() != 0) {
         dirNotEmpty = false;
       }
+    } finally {
+      readUnlock();
     }
     return dirNotEmpty;
   }
@@ -1004,7 +1090,8 @@ class FSDirectory implements Closeable {
       long mtime) throws UnresolvedLinkException {
     src = normalizePath(src);
 
-    synchronized (rootDir) {
+    writeLock();
+    try {
       INode[] inodes =  rootDir.getExistingPathINodes(src, false);
       INode targetNode = inodes[inodes.length-1];
 
@@ -1035,6 +1122,8 @@ class FSDirectory implements Closeable {
             +src+" is removed");
       }
       return filesRemoved;
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1052,7 +1141,8 @@ class FSDirectory implements Closeable {
   private void replaceNode(String path, INodeFile oldnode, INodeFile newnode,
                            boolean updateDiskspace) 
       throws IOException, UnresolvedLinkException {    
-    synchronized (rootDir) {
+    writeLock();
+    try {
       long dsOld = oldnode.diskspaceConsumed();
       
       //
@@ -1089,6 +1179,8 @@ class FSDirectory implements Closeable {
         newnode.setBlock(index, info); // inode refers to the block in BlocksMap
         index++;
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1104,7 +1196,8 @@ class FSDirectory implements Closeable {
       boolean needLocation) throws UnresolvedLinkException, IOException {
     String srcs = normalizePath(src);
 
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(srcs, true);
       if (targetNode == null)
         return null;
@@ -1126,6 +1219,8 @@ class FSDirectory implements Closeable {
       }
       return new DirectoryListing(
           listing, totalNumChildren-startChild-numOfListing);
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1138,7 +1233,8 @@ class FSDirectory implements Closeable {
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
       throws UnresolvedLinkException {
     String srcs = normalizePath(src);
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(srcs, resolveLink);
       if (targetNode == null) {
         return null;
@@ -1146,6 +1242,8 @@ class FSDirectory implements Closeable {
       else {
         return createFileStatus(HdfsFileStatus.EMPTY_NAME, targetNode);
       }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1154,7 +1252,8 @@ class FSDirectory implements Closeable {
    */
   Block[] getFileBlocks(String src) throws UnresolvedLinkException {
     waitForReady();
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(src, false);
       if (targetNode == null)
         return null;
@@ -1163,6 +1262,8 @@ class FSDirectory implements Closeable {
       if (targetNode.isLink()) 
         return null;
       return ((INodeFile)targetNode).getBlocks();
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1170,12 +1271,15 @@ class FSDirectory implements Closeable {
    * Get {@link INode} associated with the file.
    */
   INodeFile getFileINode(String src) throws UnresolvedLinkException {
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode inode = rootDir.getNode(src, true);
       if (inode == null || inode.isDirectory())
         return null;
       assert !inode.isLink();      
       return (INodeFile)inode;
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1193,8 +1297,11 @@ class FSDirectory implements Closeable {
    */
   INode[] getExistingPathINodes(String path) 
     throws UnresolvedLinkException {
-    synchronized (rootDir){
+    readLock();
+    try {
       return rootDir.getExistingPathINodes(path, true);
+    } finally {
+      readUnlock();
     }
   }
   
@@ -1203,7 +1310,8 @@ class FSDirectory implements Closeable {
    */
   boolean isValidToCreate(String src) throws UnresolvedLinkException {
     String srcs = normalizePath(src);
-    synchronized (rootDir) {
+    readLock();
+    try {
       if (srcs.startsWith("/") && 
           !srcs.endsWith("/") && 
           rootDir.getNode(srcs, false) == null) {
@@ -1211,6 +1319,8 @@ class FSDirectory implements Closeable {
       } else {
         return false;
       }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1218,9 +1328,12 @@ class FSDirectory implements Closeable {
    * Check whether the path specifies a directory
    */
   boolean isDir(String src) throws UnresolvedLinkException {
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode node = rootDir.getNode(normalizePath(src), false);
       return node != null && node.isDirectory();
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1237,7 +1350,8 @@ class FSDirectory implements Closeable {
                                          throws QuotaExceededException,
                                                 FileNotFoundException,
                                                 UnresolvedLinkException {
-    synchronized (rootDir) {
+    writeLock();
+    try {
       INode[] inodes = rootDir.getExistingPathINodes(path, false);
       int len = inodes.length;
       if (inodes[len - 1] == null) {
@@ -1245,6 +1359,8 @@ class FSDirectory implements Closeable {
                                         " does not exist under rootDir.");
       }
       updateCount(inodes, len-1, nsDelta, dsDelta, true);
+    } finally {
+      writeUnlock();
     }
   }
   
@@ -1360,7 +1476,8 @@ class FSDirectory implements Closeable {
     byte[][] components = INode.getPathComponents(names);
     INode[] inodes = new INode[components.length];
 
-    synchronized(rootDir) {
+    writeLock();
+    try {
       rootDir.getExistingPathINodes(components, inodes, false);
 
       // find the index of the first null in inodes[]
@@ -1393,6 +1510,8 @@ class FSDirectory implements Closeable {
               "DIR* FSDirectory.mkdirs: created directory " + cur);
         }
       }
+    } finally {
+      writeUnlock();
     }
     return true;
   }
@@ -1404,11 +1523,14 @@ class FSDirectory implements Closeable {
                           UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     INode[] inodes = new INode[components.length];
-    synchronized (rootDir) {
+    writeLock();
+    try {
       rootDir.getExistingPathINodes(components, inodes, false);
       unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
           permissions, false, timestamp);
       return inodes[inodes.length-1];
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1435,10 +1557,13 @@ class FSDirectory implements Closeable {
     child.setLocalName(path);
     cacheName(child);
     INode[] inodes = new INode[components.length];
-    synchronized (rootDir) {
+    writeLock();
+    try {
       rootDir.getExistingPathINodes(components, inodes, false);
       return addChild(inodes, inodes.length-1, child, childDiskspace,
                       inheritPermission);
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1597,7 +1722,8 @@ class FSDirectory implements Closeable {
   ContentSummary getContentSummary(String src) 
     throws FileNotFoundException, UnresolvedLinkException {
     String srcs = normalizePath(src);
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(srcs, false);
       if (targetNode == null) {
         throw new FileNotFoundException("File does not exist: " + srcs);
@@ -1605,6 +1731,8 @@ class FSDirectory implements Closeable {
       else {
         return targetNode.computeContentSummary();
       }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1710,7 +1838,8 @@ class FSDirectory implements Closeable {
     
     String srcs = normalizePath(src);
 
-    synchronized(rootDir) {
+    writeLock();
+    try {
       INode[] inodes = rootDir.getExistingPathINodes(src, true);
       INode targetNode = inodes[inodes.length-1];
       if (targetNode == null) {
@@ -1744,6 +1873,8 @@ class FSDirectory implements Closeable {
         }
         return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
       }
+    } finally {
+      writeUnlock();
     }
   }
   
@@ -1755,18 +1886,24 @@ class FSDirectory implements Closeable {
   void setQuota(String src, long nsQuota, long dsQuota) 
     throws FileNotFoundException, QuotaExceededException,
     UnresolvedLinkException { 
-    synchronized (rootDir) {    
+    writeLock();
+    try {
       INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
       if (dir != null) {
         fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(), 
                                          dir.getDsQuota());
       }
+    } finally {
+      writeUnlock();
     }
   }
   
   long totalInodes() {
-    synchronized (rootDir) {
+    readLock();
+    try {
       return rootDir.numItemsInTree();
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1781,9 +1918,12 @@ class FSDirectory implements Closeable {
 
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
     throws UnresolvedLinkException {
-    synchronized(rootDir) {
+    writeLock();
+    try {
       INodeFile inode = getFileINode(src);
       return unprotectedSetTimes(src, inode, mtime, atime, force);
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1940,8 +2080,11 @@ class FSDirectory implements Closeable {
       throws UnresolvedLinkException {
     INodeSymlink newNode = new INodeSymlink(target, modTime, atime, perm);
     try {
-      synchronized (rootDir) {
+      writeLock();
+      try {
         newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
+      } finally {
+        writeUnlock();
       }
     } catch (UnresolvedLinkException e) {
       /* All UnresolvedLinkExceptions should have been resolved by now, but we

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=997940&r1=997939&r2=997940&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Sep 16 22:26:12 2010
@@ -96,6 +96,7 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -272,6 +273,9 @@ public class FSNamesystem implements FSC
   // precision of access times.
   private long accessTimePrecision = 0;
 
+  // lock to protect FSNamesystem.
+  private ReentrantReadWriteLock fsLock;
+
   /**
    * FSNamesystem constructor.
    */
@@ -292,6 +296,7 @@ public class FSNamesystem implements FSC
       throws IOException {
     this.systemStart = now();
     this.blockManager = new BlockManager(this, conf);
+    this.fsLock = new ReentrantReadWriteLock(true); // fair locking
     setConfigurationParameters(conf);
     dtSecretManager = createDelegationTokenSecretManager(conf);
     this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
@@ -393,11 +398,33 @@ public class FSNamesystem implements FSC
     return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
   }
 
+  // utility methods to acquire and release read lock and write lock
+  void readLock() {
+    this.fsLock.readLock().lock();
+  }
+
+  void readUnlock() {
+    this.fsLock.readLock().unlock();
+  }
+
+  void writeLock() {
+    this.fsLock.writeLock().lock();
+  }
+
+  void writeUnlock() {
+    this.fsLock.writeLock().unlock();
+  }
+
+  boolean hasWriteLock() {
+    return this.fsLock.isWriteLockedByCurrentThread();
+  }
+
   /**
    * dirs is a list of directories where the filesystem directory state 
    * is stored
    */
   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
+    this.fsLock = new ReentrantReadWriteLock(true);
     this.blockManager = new BlockManager(this, conf);
     setConfigurationParameters(conf);
     this.dir = new FSDirectory(fsImage, this, conf);
@@ -538,7 +565,9 @@ public class FSNamesystem implements FSC
   /**
    * Dump all metadata into specified file
    */
-  synchronized void metaSave(String filename) throws IOException {
+  void metaSave(String filename) throws IOException {
+    writeLock();
+    try {
     checkSuperuserPrivilege();
     File file = new File(System.getProperty("hadoop.log.dir"), filename);
     PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
@@ -565,6 +594,9 @@ public class FSNamesystem implements FSC
 
     out.flush();
     out.close();
+    } finally {
+      writeUnlock();
+    }
   }
 
   long getDefaultBlockSize() {
@@ -595,8 +627,10 @@ public class FSNamesystem implements FSC
    * @param datanode on which blocks are located
    * @param size total size of blocks
    */
-  synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
+  BlocksWithLocations getBlocks(DatanodeID datanode, long size)
       throws IOException {
+    readLock();
+    try {
     checkSuperuserPrivilege();
 
     DatanodeDescriptor node = getDatanode(datanode);
@@ -637,6 +671,9 @@ public class FSNamesystem implements FSC
 
     return new BlocksWithLocations(
         results.toArray(new BlockWithLocations[results.size()]));
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -673,9 +710,11 @@ public class FSNamesystem implements FSC
    * Set permissions for an existing file.
    * @throws IOException
    */
-  public synchronized void setPermission(String src, FsPermission permission)
+  public void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     if (isInSafeMode())
       throw new SafeModeException("Cannot set permission for " + src, safeMode);
     checkOwner(src);
@@ -687,15 +726,20 @@ public class FSNamesystem implements FSC
                     Server.getRemoteIp(),
                     "setPermission", src, null, stat);
     }
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Set owner for an existing file.
    * @throws IOException
    */
-  public synchronized void setOwner(String src, String username, String group)
+  public void setOwner(String src, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     if (isInSafeMode())
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
     FSPermissionChecker pc = checkOwner(src);
@@ -716,6 +760,9 @@ public class FSNamesystem implements FSC
                     Server.getRemoteIp(),
                     "setOwner", src, null, stat);
     }
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -767,25 +814,53 @@ public class FSNamesystem implements FSC
     return ret;
   }
 
-  private synchronized LocatedBlocks getBlockLocationsInternal(String src,
+  private LocatedBlocks getBlockLocationsInternal(String src,
                                                        long offset, 
                                                        long length,
                                                        boolean doAccessTime, 
                                                        boolean needBlockToken)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
-    INodeFile inode = dir.getFileINode(src);
-    if (inode == null)
-      throw new FileNotFoundException("File does not exist: " + src);
-    assert !inode.isLink();
-    if (doAccessTime && isAccessTimeSupported()) {
-      dir.setTimes(src, inode, -1, now(), false);
+
+    for (int attempt = 0; attempt < 2; attempt++) {
+      if (attempt == 0) { // first attempt is with readlock
+        readLock();
+      }  else { // second attempt is with  write lock
+        writeLock(); // writelock is needed to set accesstime
+      }
+      try {
+        long now = now();
+        INodeFile inode = dir.getFileINode(src);
+        if (inode == null) {
+          throw new FileNotFoundException("File does not exist: " + src);
+        }
+        assert !inode.isLink();
+        if (doAccessTime && isAccessTimeSupported()) {
+          if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
+            // if we have to set access time but we only have the readlock, then
+            // restart this entire operation with the writeLock.
+            if (attempt == 0) {
+              continue;
+            }
+          }
+          dir.setTimes(src, inode, -1, now, false);
+        }
+        return getBlockLocationsInternal(inode, offset, length, needBlockToken);
+      } finally {
+        if (attempt == 0) {
+          readUnlock();
+        } else {
+          writeUnlock();
+        }
+      }
     }
-    return getBlockLocationsInternal(inode, offset, length, needBlockToken);
+    return null; // can never reach here
   }
   
-  synchronized LocatedBlocks getBlockLocationsInternal(INodeFile inode,
+  LocatedBlocks getBlockLocationsInternal(INodeFile inode,
       long offset, long length, boolean needBlockToken)
   throws IOException {
+    readLock();
+    try {
     final BlockInfo[] blocks = inode.getBlocks();
     if (LOG.isDebugEnabled()) {
       LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -818,6 +893,9 @@ public class FSNamesystem implements FSC
           blockManager.getBlockLocation(last, n), false);
       }
     }
+    } finally {
+      readUnlock();
+    }
   }
 
   /** Generate block tokens for the blocks to be returned. */
@@ -869,7 +947,8 @@ public class FSNamesystem implements FSC
       }
     }
     
-    synchronized(this) {
+    writeLock();
+    try {
       // write permission for the target
       if (isPermissionEnabled) {
         checkPathAccess(target, FsAction.WRITE);
@@ -964,6 +1043,8 @@ public class FSNamesystem implements FSC
       }
 
       dir.concatInternal(target,srcs);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
    
@@ -982,12 +1063,14 @@ public class FSNamesystem implements FSC
    * The access time is precise upto an hour. The transaction, if needed, is
    * written to the edits log but is not flushed.
    */
-  public synchronized void setTimes(String src, long mtime, long atime) 
+  public void setTimes(String src, long mtime, long atime) 
     throws IOException, UnresolvedLinkException {
     if (!isAccessTimeSupported() && atime != -1) {
       throw new IOException("Access time for hdfs is not configured. " +
                             " Please set dfs.support.accessTime configuration parameter.");
     }
+    writeLock();
+    try {
     //
     // The caller needs to have write access to set access & modification times.
     if (isPermissionEnabled) {
@@ -1005,18 +1088,26 @@ public class FSNamesystem implements FSC
     } else {
       throw new FileNotFoundException("File " + src + " does not exist.");
     }
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Create a symbolic link.
    */
-  public synchronized void createSymlink(String target, String link,
+  public void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    writeLock();
+    try {
     if (!createParent) {
       verifyParentDir(link);
     }
     createSymlinkInternal(target, link, dirPerms, createParent);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(link, false);
@@ -1029,9 +1120,11 @@ public class FSNamesystem implements FSC
   /**
    * Create a symbolic link.
    */
-  private synchronized void createSymlinkInternal(String target, String link,
+  private void createSymlinkInternal(String target, String link,
       PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
+    writeLock();
+    try {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" + 
         target + " link=" + link);
@@ -1055,6 +1148,9 @@ public class FSNamesystem implements FSC
 
     // add symbolic link to namespace
     dir.addSymlink(link, target, dirPerms, createParent);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -1082,9 +1178,11 @@ public class FSNamesystem implements FSC
     return status;
   }
 
-  private synchronized boolean setReplicationInternal(String src,
+  private boolean setReplicationInternal(String src,
       short replication) throws AccessControlException, QuotaExceededException,
       SafeModeException, UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     if (isInSafeMode())
       throw new SafeModeException("Cannot set replication for " + src, safeMode);
     blockManager.verifyReplication(src, replication, null);
@@ -1116,6 +1214,9 @@ public class FSNamesystem implements FSC
           + ". New replication is " + replication);
     }
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
     
   long getPreferredBlockSize(String filename) 
@@ -1183,12 +1284,14 @@ public class FSNamesystem implements FSC
    * 
    * @return the last block locations if the block is partial or null otherwise
    */
-  private synchronized LocatedBlock startFileInternal(String src,
+  private LocatedBlock startFileInternal(String src,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    writeLock();
+    try {
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
     boolean append = flag.contains(CreateFlag.APPEND);
     boolean create = flag.contains(CreateFlag.CREATE);
@@ -1371,6 +1474,9 @@ public class FSNamesystem implements FSC
                                    +ie.getMessage());
       throw ie;
     }
+    } finally {
+      writeUnlock();
+    }
     return null;
   }
 
@@ -1438,7 +1544,8 @@ public class FSNamesystem implements FSC
           +src+" for "+clientName);
     }
 
-    synchronized (this) {
+    writeLock();
+    try {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot add block to " + src, safeMode);
       }
@@ -1461,6 +1568,8 @@ public class FSNamesystem implements FSC
       blockSize = pendingFile.getPreferredBlockSize();
       clientNode = pendingFile.getClientNode();
       replication = (int)pendingFile.getReplication();
+    } finally {
+      writeUnlock();
     }
 
     // choose targets for the new block to be allocated.
@@ -1473,7 +1582,8 @@ public class FSNamesystem implements FSC
     }
 
     // Allocate a new block and record it in the INode. 
-    synchronized (this) {
+    writeLock();
+    try {
       INode[] pathINodes = dir.getExistingPathINodes(src);
       int inodesLen = pathINodes.length;
       checkLease(src, clientName, pathINodes[inodesLen-1]);
@@ -1490,6 +1600,8 @@ public class FSNamesystem implements FSC
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
       }      
+    } finally {
+      writeUnlock();
     }
         
     // Create next block
@@ -1504,9 +1616,11 @@ public class FSNamesystem implements FSC
   /**
    * The client would like to let go of the given block
    */
-  public synchronized boolean abandonBlock(Block b, String src, String holder)
+  public boolean abandonBlock(Block b, String src, String holder)
       throws LeaseExpiredException, FileNotFoundException,
       UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     //
     // Remove the block from the pending creates list
     //
@@ -1522,6 +1636,9 @@ public class FSNamesystem implements FSC
                                     + " is removed from pendingCreates");
     }
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
   
   // make sure that we still have the lease on this file.
@@ -1571,9 +1688,11 @@ public class FSNamesystem implements FSC
     return success ;
   }
 
-  private synchronized boolean completeFileInternal(String src, 
+  private boolean completeFileInternal(String src, 
       String holder, Block last) throws SafeModeException,
       UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
           src + " for " + holder);
@@ -1594,6 +1713,9 @@ public class FSNamesystem implements FSC
     NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
                                   + " is closed by " + holder);
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
 
   /** 
@@ -1638,7 +1760,9 @@ public class FSNamesystem implements FSC
    * replicated.  If not, return false. If checkall is true, then check
    * all blocks, otherwise check only penultimate block.
    */
-  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+  boolean checkFileProgress(INodeFile v, boolean checkall) {
+    writeLock();
+    try {
     if (checkall) {
       //
       // check all blocks of the file.
@@ -1664,6 +1788,9 @@ public class FSNamesystem implements FSC
       }
     }
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
 
 
@@ -1672,9 +1799,15 @@ public class FSNamesystem implements FSC
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
    */
-  public synchronized void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
+  public void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
     throws IOException {
-    blockManager.findAndMarkBlockAsCorrupt(blk, dn);
+    writeLock();
+    try {
+      blockManager.findAndMarkBlockAsCorrupt(blk, dn);
+    } finally {
+      writeUnlock();
+    }
+
   }
 
 
@@ -1709,8 +1842,11 @@ public class FSNamesystem implements FSC
 
   /** @deprecated See {@link #renameTo(String, String)} */
   @Deprecated
-  private synchronized boolean renameToInternal(String src, String dst)
+  private boolean renameToInternal(String src, String dst)
     throws IOException, UnresolvedLinkException {
+
+    writeLock();
+    try {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
@@ -1736,6 +1872,9 @@ public class FSNamesystem implements FSC
       return true;
     }
     return false;
+    } finally {
+      writeUnlock();
+    }
   }
   
 
@@ -1755,8 +1894,10 @@ public class FSNamesystem implements FSC
     }
   }
 
-  private synchronized void renameToInternal(String src, String dst,
+  private void renameToInternal(String src, String dst,
       Options.Rename... options) throws IOException {
+    writeLock();
+    try {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
@@ -1775,6 +1916,9 @@ public class FSNamesystem implements FSC
     HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     dir.renameTo(src, dst, options);
     changeLease(src, dst, dinfo); // update lease with new filename
+    } finally {
+      writeUnlock();
+    }
   }
   
   /**
@@ -1817,7 +1961,9 @@ public class FSNamesystem implements FSC
       UnresolvedLinkException, IOException{
     boolean deleteNow = false;
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
-    synchronized(this) {
+
+    writeLock();
+    try {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot delete " + src, safeMode);
       }
@@ -1832,6 +1978,8 @@ public class FSNamesystem implements FSC
       if (deleteNow) { // Perform small deletes right away
         removeBlocks(collectedBlocks);
       }
+    } finally {
+      writeUnlock();
     }
     // Log directory deletion to editlog
     getEditLog().logSync();
@@ -1853,10 +2001,13 @@ public class FSNamesystem implements FSC
     while (start < blocks.size()) {
       end = BLOCK_DELETION_INCREMENT + start;
       end = end > blocks.size() ? blocks.size() : end;
-      synchronized(this) {
+      writeLock();
+      try {
         for (int i=start; i<end; i++) {
           blockManager.removeBlock(blocks.get(i));
         }
+      } finally {
+        writeUnlock();
       }
       start = end;
     }
@@ -1913,9 +2064,11 @@ public class FSNamesystem implements FSC
   /**
    * Create all the necessary directories
    */
-  private synchronized boolean mkdirsInternal(String src,
+  private boolean mkdirsInternal(String src,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    writeLock();
+    try {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
@@ -1949,6 +2102,9 @@ public class FSNamesystem implements FSC
       throw new IOException("Failed to create directory: " + src);
     }
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
@@ -1985,12 +2141,15 @@ public class FSNamesystem implements FSC
 
     NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
                                   + src + " for " + clientName);
-    synchronized (this) {
+    writeLock();
+    try {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot fsync file " + src, safeMode);
       }
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
       dir.persistBlocks(src, pendingFile);
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -2157,10 +2316,13 @@ public class FSNamesystem implements FSC
     checkReplicationFactor(newFile);
   }
 
-  synchronized void commitBlockSynchronization(Block lastblock,
+  void commitBlockSynchronization(Block lastblock,
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
       throws IOException, UnresolvedLinkException {
+    String src = "";
+    writeLock();
+    try {
     LOG.info("commitBlockSynchronization(lastblock=" + lastblock
           + ", newgenerationstamp=" + newgenerationstamp
           + ", newlength=" + newlength
@@ -2222,7 +2384,7 @@ public class FSNamesystem implements FSC
 
     // If this commit does not want to close the file, persist
     // blocks only if append is supported and return
-    String src = leaseManager.findPath(pendingFile);
+    src = leaseManager.findPath(pendingFile);
     if (!closeFile) {
       if (supportAppends) {
         dir.persistBlocks(src, pendingFile);
@@ -2237,6 +2399,9 @@ public class FSNamesystem implements FSC
 
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     LOG.info("commitBlockSynchronization(newblock=" + lastblock
           + ", file=" + src
@@ -2314,8 +2479,10 @@ public class FSNamesystem implements FSC
    * 
    * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
    */
-  public synchronized void registerDatanode(DatanodeRegistration nodeReg
+  public void registerDatanode(DatanodeRegistration nodeReg
                                             ) throws IOException {
+    writeLock();
+    try {
     String dnAddress = Server.getRemoteAddress();
     if (dnAddress == null) {
       // Mostly called inside an RPC.
@@ -2429,6 +2596,9 @@ public class FSNamesystem implements FSC
       // because its is done when the descriptor is created
     }
     return;
+    } finally {
+      writeUnlock();
+    }
   }
     
   /* Resolve a node's network location */
@@ -2687,11 +2857,13 @@ public class FSNamesystem implements FSC
     workFound = blockManager.computeReplicationWork(blocksToProcess);
     
     // Update FSNamesystemMetrics counters
-    synchronized (this) {
+    writeLock();
+    try {
       blockManager.updateState();
       blockManager.scheduledReplicationBlocksCount = workFound;
+    } finally {
+      writeUnlock();
     }
-    
     workFound += blockManager.computeInvalidateWork(nodesToProcess);
     return workFound;
   }
@@ -2704,8 +2876,10 @@ public class FSNamesystem implements FSC
    * remove a datanode descriptor
    * @param nodeID datanode ID
    */
-  synchronized public void removeDatanode(DatanodeID nodeID) 
+  public void removeDatanode(DatanodeID nodeID) 
     throws IOException {
+    writeLock();
+    try {
     DatanodeDescriptor nodeInfo = getDatanode(nodeID);
     if (nodeInfo != null) {
       removeDatanode(nodeInfo);
@@ -2713,6 +2887,9 @@ public class FSNamesystem implements FSC
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
                                    + nodeID.getName() + " does not exist");
     }
+    } finally {
+      writeUnlock();
+    }
   }
   
   /**
@@ -2815,7 +2992,8 @@ public class FSNamesystem implements FSC
 
       // acquire the fsnamesystem lock, and then remove the dead node.
       if (foundDead) {
-        synchronized (this) {
+        writeLock();
+        try {
           synchronized(heartbeats) {
             synchronized (datanodeMap) {
               DatanodeDescriptor nodeInfo = null;
@@ -2831,6 +3009,8 @@ public class FSNamesystem implements FSC
               }
             }
           }
+        } finally {
+          writeUnlock();
         }
       }
       allAlive = !foundDead;
@@ -2841,9 +3021,12 @@ public class FSNamesystem implements FSC
    * The given node is reporting all its blocks.  Use this info to 
    * update the (machine-->blocklist) and (block-->machinelist) tables.
    */
-  public synchronized void processReport(DatanodeID nodeID, 
+  public void processReport(DatanodeID nodeID, 
                                          BlockListAsLongs newReport
                                         ) throws IOException {
+
+    writeLock();
+    try {
     long startTime = now();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
@@ -2864,6 +3047,9 @@ public class FSNamesystem implements FSC
     
     blockManager.processReport(node, newReport);
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -2971,10 +3157,12 @@ public class FSNamesystem implements FSC
   /**
    * The given node is reporting that it received a certain block.
    */
-  public synchronized void blockReceived(DatanodeID nodeID,  
+  public void blockReceived(DatanodeID nodeID,  
                                          Block block,
                                          String delHint
                                          ) throws IOException {
+    writeLock();
+    try {
     DatanodeDescriptor node = getDatanode(nodeID);
     if (node == null || !node.isAlive) {
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
@@ -2995,6 +3183,9 @@ public class FSNamesystem implements FSC
     }
 
     blockManager.addBlock(node, block, delHint);
+    } finally {
+      writeUnlock();
+    }
   }
 
   public long getMissingBlocksCount() {
@@ -3080,16 +3271,17 @@ public class FSNamesystem implements FSC
     return getDatanodeListForReport(type).size(); 
   }
 
-  private synchronized ArrayList<DatanodeDescriptor> getDatanodeListForReport(
-                                                      DatanodeReportType type) {                  
-    
+  private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
+                                                      DatanodeReportType type) {
     boolean listLiveNodes = type == DatanodeReportType.ALL ||
                             type == DatanodeReportType.LIVE;
     boolean listDeadNodes = type == DatanodeReportType.ALL ||
                             type == DatanodeReportType.DEAD;
 
     HashMap<String, String> mustList = new HashMap<String, String>();
-    
+
+    readLock();
+    try {
     if (listDeadNodes) {
       //first load all the nodes listed in include and exclude files.
       for (Iterator<String> it = hostsReader.getHosts().iterator(); 
@@ -3132,10 +3324,15 @@ public class FSNamesystem implements FSC
     }
     
     return nodes;
+    } finally {
+      readUnlock();
+    }
   }
 
-  public synchronized DatanodeInfo[] datanodeReport( DatanodeReportType type
+  public DatanodeInfo[] datanodeReport( DatanodeReportType type
       ) throws AccessControlException {
+    readLock();
+    try {
     checkSuperuserPrivilege();
 
     ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
@@ -3144,6 +3341,9 @@ public class FSNamesystem implements FSC
       arr[i] = new DatanodeInfo(results.get(i));
     }
     return arr;
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -3154,7 +3354,9 @@ public class FSNamesystem implements FSC
    * @throws AccessControlException if superuser privilege is violated.
    * @throws IOException if 
    */
-  synchronized void saveNamespace() throws AccessControlException, IOException {
+  void saveNamespace() throws AccessControlException, IOException {
+    writeLock();
+    try {
     checkSuperuserPrivilege();
     if(!isInSafeMode()) {
       throw new IOException("Safe mode should be turned ON " +
@@ -3162,6 +3364,9 @@ public class FSNamesystem implements FSC
     }
     getFSImage().saveNamespace(true);
     LOG.info("New namespace image has been created.");
+    } finally {
+      writeUnlock();
+    }
   }
   
   /**
@@ -3170,7 +3375,9 @@ public class FSNamesystem implements FSC
    * 
    * @throws AccessControlException if superuser privilege is violated.
    */
-  synchronized boolean restoreFailedStorage(String arg) throws AccessControlException {
+  boolean restoreFailedStorage(String arg) throws AccessControlException {
+    writeLock();
+    try {
     checkSuperuserPrivilege();
     
     // if it is disabled - enable it and vice versa.
@@ -3181,13 +3388,17 @@ public class FSNamesystem implements FSC
     getFSImage().setRestoreFailedStorage(val);
     
     return val;
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    */
-  public synchronized void DFSNodesStatus(ArrayList<DatanodeDescriptor> live, 
+  public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live, 
                                           ArrayList<DatanodeDescriptor> dead) {
-
+    readLock();
+    try {
     ArrayList<DatanodeDescriptor> results = 
                             getDatanodeListForReport(DatanodeReportType.ALL);    
     for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
@@ -3197,12 +3408,17 @@ public class FSNamesystem implements FSC
       else
         live.add(node);
     }
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
    * Prints information about all datanodes.
    */
-  private synchronized void datanodeDump(PrintWriter out) {
+  private void datanodeDump(PrintWriter out) {
+    readLock();
+    try {
     synchronized (datanodeMap) {
       out.println("Metasave: Number of datanodes: " + datanodeMap.size());
       for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
@@ -3210,6 +3426,9 @@ public class FSNamesystem implements FSC
         out.println(node.dumpDatanode());
       }
     }
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -3352,7 +3571,8 @@ public class FSNamesystem implements FSC
     hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                 conf.get("dfs.hosts.exclude", ""));
     hostsReader.refresh();
-    synchronized (this) {
+    writeLock();
+    try {
       for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
            it.hasNext();) {
         DatanodeDescriptor node = it.next();
@@ -3373,8 +3593,9 @@ public class FSNamesystem implements FSC
           }
         }
       }
-    } 
-      
+    } finally {
+      writeUnlock();
+    }
   }
     
   void finalizeUpgrade() throws IOException {
@@ -3390,8 +3611,9 @@ public class FSNamesystem implements FSC
    * Returns TRUE if node is registered (including when it is on the 
    * exclude list and is being decommissioned). 
    */
-  private synchronized boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) 
+  private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) 
     throws IOException {
+    assert (hasWriteLock());
     if (!inHostsList(nodeReg, ipAddr)) {
       return false;    
     }
@@ -3889,7 +4111,9 @@ public class FSNamesystem implements FSC
    * Enter safe mode manually.
    * @throws IOException
    */
-  synchronized void enterSafeMode() throws IOException {
+  void enterSafeMode() throws IOException {
+    writeLock();
+    try {
     // Ensure that any concurrent operations have been fully synced
     // before entering safe mode. This ensures that the FSImage
     // is entirely stable on disk as soon as we're in safe mode.
@@ -3901,13 +4125,18 @@ public class FSNamesystem implements FSC
     safeMode.setManual();
     NameNode.stateChangeLog.info("STATE* Safe mode is ON. " 
                                 + safeMode.getTurnOffTip());
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Leave safe mode.
    * @throws IOException
    */
-  synchronized void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
+  void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
+    writeLock();
+    try {
     if (!isInSafeMode()) {
       NameNode.stateChangeLog.info("STATE* Safe mode is already OFF."); 
       return;
@@ -3916,50 +4145,78 @@ public class FSNamesystem implements FSC
       throw new SafeModeException("Distributed upgrade is in progress",
                                   safeMode);
     safeMode.leave(checkForUpgrades);
+    } finally {
+      writeUnlock();
+    }
   }
     
-  synchronized String getSafeModeTip() {
+  String getSafeModeTip() {
+    readLock();
+    try {
     if (!isInSafeMode())
       return "";
     return safeMode.getTurnOffTip();
+    } finally {
+      readUnlock();
+    }
   }
 
   long getEditLogSize() throws IOException {
     return getEditLog().getEditLogSize();
   }
 
-  synchronized CheckpointSignature rollEditLog() throws IOException {
+  CheckpointSignature rollEditLog() throws IOException {
+    writeLock();
+    try {
     if (isInSafeMode()) {
       throw new SafeModeException("Checkpoint not created",
                                   safeMode);
     }
     LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
     return getFSImage().rollEditLog();
+    } finally {
+      writeUnlock();
+    }
   }
 
-  synchronized void rollFSImage() throws IOException {
+  void rollFSImage() throws IOException {
+    writeLock();
+    try {
     if (isInSafeMode()) {
       throw new SafeModeException("Checkpoint not created",
                                   safeMode);
     }
     LOG.info("Roll FSImage from " + Server.getRemoteAddress());
     getFSImage().rollFSImage();
+    } finally {
+      writeUnlock();
+    }
   }
 
-  synchronized NamenodeCommand startCheckpoint(
+  NamenodeCommand startCheckpoint(
                                 NamenodeRegistration bnReg, // backup node
                                 NamenodeRegistration nnReg) // active name-node
   throws IOException {
+    writeLock();
+    try {
     LOG.info("Start checkpoint for " + bnReg.getAddress());
     NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
     getEditLog().logSync();
     return cmd;
+    } finally {
+      writeUnlock();
+    }
   }
 
-  synchronized void endCheckpoint(NamenodeRegistration registration,
+  void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
+    writeLock();
+    try {
     LOG.info("End checkpoint for " + registration.getAddress());
     getFSImage().endCheckpoint(sig, registration.getRole());
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -4048,8 +4305,13 @@ public class FSNamesystem implements FSC
         fsOwner.getShortUserName(), supergroup);
     if (!pc.isSuper) {
       dir.waitForReady();
-      pc.checkPermission(path, dir.rootDir, doCheckOwner,
-          ancestorAccess, parentAccess, access, subAccess);
+      readLock();
+      try {
+        pc.checkPermission(path, dir.rootDir, doCheckOwner,
+            ancestorAccess, parentAccess, access, subAccess);
+      } finally {
+        readUnlock();
+      } 
     }
     return pc;
   }
@@ -4251,8 +4513,10 @@ public class FSNamesystem implements FSC
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
-  synchronized LocatedBlock updateBlockForPipeline(Block block, 
+  LocatedBlock updateBlockForPipeline(Block block, 
       String clientName) throws IOException {
+    writeLock();
+    try {
     // check vadility of parameters
     checkUCBlock(block, clientName);
     
@@ -4264,6 +4528,9 @@ public class FSNamesystem implements FSC
           block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
     }
     return locatedBlock;
+    } finally {
+      writeUnlock();
+    }
   }
   
   
@@ -4276,9 +4543,11 @@ public class FSNamesystem implements FSC
    * @param newNodes datanodes in the pipeline
    * @throws IOException if any error occurs
    */
-  synchronized void updatePipeline(String clientName, Block oldBlock, 
+  void updatePipeline(String clientName, Block oldBlock, 
       Block newBlock, DatanodeID[] newNodes)
       throws IOException {
+    writeLock();
+    try {
     assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
     + oldBlock + " has different block identifier";
     LOG.info("updatePipeline(block=" + oldBlock
@@ -4325,6 +4594,9 @@ public class FSNamesystem implements FSC
     }
     LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
     return;
+    } finally {
+      writeUnlock();
+    }
   }
 
   // rename was successful. If any part of the renamed subtree had
@@ -4393,8 +4665,10 @@ public class FSNamesystem implements FSC
    * @param registration
    * @throws IOException
    */
-  synchronized void registerBackupNode(NamenodeRegistration registration)
+  void registerBackupNode(NamenodeRegistration registration)
   throws IOException {
+    writeLock();
+    try {
     if(getFSImage().getNamespaceID() != registration.getNamespaceID())
       throw new IOException("Incompatible namespaceIDs: " 
           + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
@@ -4404,6 +4678,9 @@ public class FSNamesystem implements FSC
     if(!regAllowed)
       throw new IOException("Registration is not allowed. " +
           "Another node is registered as a backup.");
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -4413,14 +4690,19 @@ public class FSNamesystem implements FSC
    * @param registration
    * @throws IOException
    */
-  synchronized void releaseBackupNode(NamenodeRegistration registration)
+  void releaseBackupNode(NamenodeRegistration registration)
   throws IOException {
+    writeLock();
+    try {
     if(getFSImage().getNamespaceID() != registration.getNamespaceID())
       throw new IOException("Incompatible namespaceIDs: " 
           + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
           + "; " + registration.getRole() +
               " node namespaceID = " + registration.getNamespaceID());
     getEditLog().releaseBackupStream(registration);
+    } finally {
+      writeUnlock();
+    }
   }
 
   public int numCorruptReplicas(Block blk) {
@@ -4473,9 +4755,11 @@ public class FSNamesystem implements FSC
    * @throws AccessControlException
    * @throws IOException
    */
-  synchronized Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
+  Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
       String startBlockAfter) throws AccessControlException, IOException {
 
+    readLock();
+    try {
     checkSuperuserPrivilege();
     long startBlockId = 0;
     // print a limited # of corrupt files per call
@@ -4502,9 +4786,14 @@ public class FSNamesystem implements FSC
     }
     LOG.info("list corrupt file blocks returned: " + count);
     return corruptFiles;
+    } finally {
+      readUnlock();
+    }
   }
   
-  public synchronized ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+  public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+    readLock();
+    try {
     ArrayList<DatanodeDescriptor> decommissioningNodes = 
         new ArrayList<DatanodeDescriptor>();
     ArrayList<DatanodeDescriptor> results = 
@@ -4516,6 +4805,9 @@ public class FSNamesystem implements FSC
       }
     }
     return decommissioningNodes;
+    } finally {
+      readUnlock();
+    }
   }
 
   /*
@@ -4644,8 +4936,11 @@ public class FSNamesystem implements FSC
    */
   private void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) throws IOException {
-    synchronized (this) {
+    writeLock();
+    try {
       getEditLog().logGetDelegationToken(id, expiryTime);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
   }
@@ -4658,8 +4953,11 @@ public class FSNamesystem implements FSC
    */
   private void logRenewDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) throws IOException {
-    synchronized (this) {
+    writeLock();
+    try {
       getEditLog().logRenewDelegationToken(id, expiryTime);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
   }
@@ -4672,8 +4970,11 @@ public class FSNamesystem implements FSC
    */
   private void logCancelDelegationToken(DelegationTokenIdentifier id)
       throws IOException {
-    synchronized (this) {
+    writeLock();
+    try {
       getEditLog().logCancelDelegationToken(id);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
   }
@@ -4684,8 +4985,11 @@ public class FSNamesystem implements FSC
    * @param key new delegation key.
    */
   public void logUpdateMasterKey(DelegationKey key) throws IOException {
-    synchronized (this) {
+    writeLock();
+    try {
       getEditLog().logUpdateMasterKey(key);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=997940&r1=997939&r2=997940&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Thu Sep 16 22:26:12 2010
@@ -120,7 +120,6 @@ class FSPermissionChecker {
           + ", subAccess=" + subAccess);
     }
     // check if (parentAccess != null) && file exists, then check sb
-    synchronized(root) {
       // Resolve symlinks, the check is performed on the link target.
       INode[] inodes = root.getExistingPathINodes(path, true);
       int ancestorIndex = inodes.length - 2;
@@ -147,7 +146,6 @@ class FSPermissionChecker {
       if (doCheckOwner) {
         checkOwner(inodes[inodes.length - 1]);
       }
-    }
   }
 
   private void checkOwner(INode inode) throws AccessControlException {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=997940&r1=997939&r2=997940&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Thu Sep 16 22:26:12 2010
@@ -366,12 +366,16 @@ public class LeaseManager {
     /** Check leases periodically. */
     public void run() {
       for(; fsnamesystem.isRunning(); ) {
-        synchronized(fsnamesystem) {
+        fsnamesystem.writeLock();
+        try {
           if (!fsnamesystem.isInSafeMode()) {
             checkLeases();
           }
+        } finally {
+          fsnamesystem.writeUnlock();
         }
 
+
         try {
           Thread.sleep(2000);
         } catch(InterruptedException ie) {



Mime
View raw message