hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r998286 [2/2] - in /hadoop/hdfs/branches/HDFS-1052: ./ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/ ...
Date Fri, 17 Sep 2010 19:51:28 GMT
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep 17 19:51:27 2010
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@@ -95,6 +96,7 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -273,6 +275,9 @@ public class FSNamesystem implements FSC
   // precision of access times.
   private long accessTimePrecision = 0;
 
+  // lock to protect FSNamesystem.
+  private ReentrantReadWriteLock fsLock;
+
   /**
    * FSNamesystem constructor.
    */
@@ -293,6 +298,7 @@ public class FSNamesystem implements FSC
       throws IOException {
     this.systemStart = now();
     this.blockManager = new BlockManager(this, conf);
+    this.fsLock = new ReentrantReadWriteLock(true); // fair locking
     setConfigurationParameters(conf);
     dtSecretManager = createDelegationTokenSecretManager(conf);
     this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
@@ -394,11 +400,33 @@ public class FSNamesystem implements FSC
     return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
   }
 
+  // utility methods to acquire and release read lock and write lock
+  void readLock() {
+    this.fsLock.readLock().lock();
+  }
+
+  void readUnlock() {
+    this.fsLock.readLock().unlock();
+  }
+
+  void writeLock() {
+    this.fsLock.writeLock().lock();
+  }
+
+  void writeUnlock() {
+    this.fsLock.writeLock().unlock();
+  }
+
+  boolean hasWriteLock() {
+    return this.fsLock.isWriteLockedByCurrentThread();
+  }
+
   /**
    * dirs is a list of directories where the filesystem directory state 
    * is stored
    */
   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
+    this.fsLock = new ReentrantReadWriteLock(true);
     this.blockManager = new BlockManager(this, conf);
     setConfigurationParameters(conf);
     this.dir = new FSDirectory(fsImage, this, conf);
@@ -539,7 +567,9 @@ public class FSNamesystem implements FSC
   /**
    * Dump all metadata into specified file
    */
-  synchronized void metaSave(String filename) throws IOException {
+  void metaSave(String filename) throws IOException {
+    writeLock();
+    try {
     checkSuperuserPrivilege();
     File file = new File(System.getProperty("hadoop.log.dir"), filename);
     PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
@@ -566,6 +596,9 @@ public class FSNamesystem implements FSC
 
     out.flush();
     out.close();
+    } finally {
+      writeUnlock();
+    }
   }
 
   long getDefaultBlockSize() {
@@ -596,8 +629,10 @@ public class FSNamesystem implements FSC
    * @param datanode on which blocks are located
    * @param size total size of blocks
    */
-  synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
+  BlocksWithLocations getBlocks(DatanodeID datanode, long size)
       throws IOException {
+    readLock();
+    try {
     checkSuperuserPrivilege();
 
     DatanodeDescriptor node = getDatanode(datanode);
@@ -638,6 +673,9 @@ public class FSNamesystem implements FSC
 
     return new BlocksWithLocations(
         results.toArray(new BlockWithLocations[results.size()]));
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -674,9 +712,11 @@ public class FSNamesystem implements FSC
    * Set permissions for an existing file.
    * @throws IOException
    */
-  public synchronized void setPermission(String src, FsPermission permission)
+  public void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     if (isInSafeMode())
       throw new SafeModeException("Cannot set permission for " + src, safeMode);
     checkOwner(src);
@@ -688,15 +728,20 @@ public class FSNamesystem implements FSC
                     Server.getRemoteIp(),
                     "setPermission", src, null, stat);
     }
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Set owner for an existing file.
    * @throws IOException
    */
-  public synchronized void setOwner(String src, String username, String group)
+  public void setOwner(String src, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     if (isInSafeMode())
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
     FSPermissionChecker pc = checkOwner(src);
@@ -717,6 +762,9 @@ public class FSNamesystem implements FSC
                     Server.getRemoteIp(),
                     "setOwner", src, null, stat);
     }
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -768,25 +816,53 @@ public class FSNamesystem implements FSC
     return ret;
   }
 
-  private synchronized LocatedBlocks getBlockLocationsInternal(String src,
+  private LocatedBlocks getBlockLocationsInternal(String src,
                                                        long offset, 
                                                        long length,
                                                        boolean doAccessTime, 
                                                        boolean needBlockToken)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
-    INodeFile inode = dir.getFileINode(src);
-    if (inode == null)
-      throw new FileNotFoundException("File does not exist: " + src);
-    assert !inode.isLink();
-    if (doAccessTime && isAccessTimeSupported()) {
-      dir.setTimes(src, inode, -1, now(), false);
+
+    for (int attempt = 0; attempt < 2; attempt++) {
+      if (attempt == 0) { // first attempt is with readlock
+        readLock();
+      }  else { // second attempt is with  write lock
+        writeLock(); // writelock is needed to set accesstime
+      }
+      try {
+        long now = now();
+        INodeFile inode = dir.getFileINode(src);
+        if (inode == null) {
+          throw new FileNotFoundException("File does not exist: " + src);
+        }
+        assert !inode.isLink();
+        if (doAccessTime && isAccessTimeSupported()) {
+          if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
+            // if we have to set access time but we only have the readlock, then
+            // restart this entire operation with the writeLock.
+            if (attempt == 0) {
+              continue;
+            }
+          }
+          dir.setTimes(src, inode, -1, now, false);
+        }
+        return getBlockLocationsInternal(inode, offset, length, needBlockToken);
+      } finally {
+        if (attempt == 0) {
+          readUnlock();
+        } else {
+          writeUnlock();
+        }
+      }
     }
-    return getBlockLocationsInternal(inode, offset, length, needBlockToken);
+    return null; // can never reach here
   }
   
-  synchronized LocatedBlocks getBlockLocationsInternal(INodeFile inode,
+  LocatedBlocks getBlockLocationsInternal(INodeFile inode,
       long offset, long length, boolean needBlockToken)
   throws IOException {
+    readLock();
+    try {
     final BlockInfo[] blocks = inode.getBlocks();
     if (LOG.isDebugEnabled()) {
       LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -801,33 +877,27 @@ public class FSNamesystem implements FSC
     } else {
       final long n = inode.computeFileSize(false);
       final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
-          blocks, offset, length, Integer.MAX_VALUE, needBlockToken);
+          blocks, offset, length, Integer.MAX_VALUE);
       final BlockInfo last = inode.getLastBlock();
       if (LOG.isDebugEnabled()) {
         LOG.debug("last = " + last);
       }
+
+      if(isBlockTokenEnabled && needBlockToken) {
+        setBlockTokens(locatedblocks);
+      }
+
       if (last.isComplete()) {
         return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
-          blockManager.getBlockLocation(last, n-last.getNumBytes(), needBlockToken), true);
+          blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
       } else {
         return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
-          blockManager.getBlockLocation(last, n, needBlockToken), false);
+          blockManager.getBlockLocation(last, n), false);
       }
     }
-  }
-
-  /** Create a LocatedBlock. 
-   * @param needBlockToken */
-  LocatedBlock createLocatedBlock(final Block b,
-      final DatanodeInfo[] locations, final long offset, final boolean corrupt,
-      boolean needBlockToken) throws IOException {
-    final ExtendedBlock blk = getExtendedBlock(b);
-    final LocatedBlock lb = new LocatedBlock(blk, locations, offset, corrupt);
-    if (isBlockTokenEnabled && needBlockToken) {
-      lb.setBlockToken(blockTokenSecretManager.generateToken(blk,
-          EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
+    } finally {
+      readUnlock();
     }
-    return lb;
   }
 
   /** Create a LocatedBlock. */
@@ -836,6 +906,17 @@ public class FSNamesystem implements FSC
     return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
   }
   
+  /** Generate block tokens for the blocks to be returned. */
+  private void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
+    for(LocatedBlock l : locatedBlocks) {
+      Token<BlockTokenIdentifier> token = 
+        blockTokenSecretManager.generateToken(l.getBlock(), 
+            EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
+    
+      l.setBlockToken(token);
+    }
+  }
+
   /**
    * Moves all the blocks from srcs and appends them to trg
    * To avoid rollbacks we will verify validitity of ALL of the args
@@ -874,7 +955,8 @@ public class FSNamesystem implements FSC
       }
     }
     
-    synchronized(this) {
+    writeLock();
+    try {
       // write permission for the target
       if (isPermissionEnabled) {
         checkPathAccess(target, FsAction.WRITE);
@@ -969,6 +1051,8 @@ public class FSNamesystem implements FSC
       }
 
       dir.concatInternal(target,srcs);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
    
@@ -987,12 +1071,14 @@ public class FSNamesystem implements FSC
    * The access time is precise upto an hour. The transaction, if needed, is
    * written to the edits log but is not flushed.
    */
-  public synchronized void setTimes(String src, long mtime, long atime) 
+  public void setTimes(String src, long mtime, long atime) 
     throws IOException, UnresolvedLinkException {
     if (!isAccessTimeSupported() && atime != -1) {
       throw new IOException("Access time for hdfs is not configured. " +
                             " Please set dfs.support.accessTime configuration parameter.");
     }
+    writeLock();
+    try {
     //
     // The caller needs to have write access to set access & modification times.
     if (isPermissionEnabled) {
@@ -1010,18 +1096,26 @@ public class FSNamesystem implements FSC
     } else {
       throw new FileNotFoundException("File " + src + " does not exist.");
     }
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Create a symbolic link.
    */
-  public synchronized void createSymlink(String target, String link,
+  public void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    writeLock();
+    try {
     if (!createParent) {
       verifyParentDir(link);
     }
     createSymlinkInternal(target, link, dirPerms, createParent);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(link, false);
@@ -1034,9 +1128,11 @@ public class FSNamesystem implements FSC
   /**
    * Create a symbolic link.
    */
-  private synchronized void createSymlinkInternal(String target, String link,
+  private void createSymlinkInternal(String target, String link,
       PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
+    writeLock();
+    try {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" + 
         target + " link=" + link);
@@ -1060,6 +1156,9 @@ public class FSNamesystem implements FSC
 
     // add symbolic link to namespace
     dir.addSymlink(link, target, dirPerms, createParent);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -1087,9 +1186,11 @@ public class FSNamesystem implements FSC
     return status;
   }
 
-  private synchronized boolean setReplicationInternal(String src,
+  private boolean setReplicationInternal(String src,
       short replication) throws AccessControlException, QuotaExceededException,
       SafeModeException, UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     if (isInSafeMode())
       throw new SafeModeException("Cannot set replication for " + src, safeMode);
     blockManager.verifyReplication(src, replication, null);
@@ -1121,6 +1222,9 @@ public class FSNamesystem implements FSC
           + ". New replication is " + replication);
     }
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
     
   long getPreferredBlockSize(String filename) 
@@ -1173,14 +1277,29 @@ public class FSNamesystem implements FSC
   }
 
   /**
-   * For description of exceptions @see {@link ClientProtocol#create()}
+   * Create new or open an existing file for append.<p>
+   * 
+   * In case of opening the file for append, the method returns the last
+   * block of the file if this is a partial block, which can still be used
+   * for writing more data. The client uses the returned block locations
+   * to form the data pipeline for this block.<br>
+   * The method returns null if the last block is full or if this is a 
+   * new file. The client then allocates a new block with the next call
+   * using {@link NameNode#addBlock()}.<p>
+   *
+   * For description of parameters and exceptions thrown see 
+   * {@link ClientProtocol#create()}
+   * 
+   * @return the last block locations if the block is partial or null otherwise
    */
-  private synchronized void startFileInternal(String src,
+  private LocatedBlock startFileInternal(String src,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    writeLock();
+    try {
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
     boolean append = flag.contains(CreateFlag.APPEND);
     boolean create = flag.contains(CreateFlag.CREATE);
@@ -1290,9 +1409,8 @@ public class FSNamesystem implements FSC
               + src + " on client " + clientMachine);
           else {
             //append & create a nonexist file equals to overwrite
-            this.startFileInternal(src, permissions, holder, clientMachine,
+            return startFileInternal(src, permissions, holder, clientMachine,
                 EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
-            return;
           }
         } else if (myFile.isDirectory()) {
           throw new IOException("failed to append to directory " + src 
@@ -1330,6 +1448,15 @@ public class FSNamesystem implements FSC
         dir.replaceNode(src, node, cons);
         leaseManager.addLease(cons.getClientName(), src);
 
+        // convert last block to under-construction
+        LocatedBlock lb = 
+          blockManager.convertLastBlockToUnderConstruction(cons);
+
+        if (lb != null && isBlockTokenEnabled) {
+          lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(), 
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
+        }
+        return lb;
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1355,6 +1482,10 @@ public class FSNamesystem implements FSC
                                    +ie.getMessage());
       throw ie;
     }
+    } finally {
+      writeUnlock();
+    }
+    return null;
   }
 
   /**
@@ -1368,52 +1499,12 @@ public class FSNamesystem implements FSC
       throw new UnsupportedOperationException("Append to hdfs not supported." +
                             " Please refer to dfs.support.append configuration parameter.");
     }
-    startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), 
-                      false, (short)blockManager.maxReplication, (long)0);
+    LocatedBlock lb = 
+      startFileInternal(src, null, holder, clientMachine, 
+                        EnumSet.of(CreateFlag.APPEND), 
+                        false, (short)blockManager.maxReplication, (long)0);
     getEditLog().logSync();
 
-    //
-    // Create a LocatedBlock object for the last block of the file
-    // to be returned to the client. Return null if the file does not
-    // have a partial block at the end.
-    //
-    LocatedBlock lb = null;
-    synchronized (this) {
-      INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
-      BlockInfo lastBlock = file.getLastBlock();
-      if (lastBlock != null) {
-        assert lastBlock == blockManager.getStoredBlock(lastBlock) :
-          "last block of the file is not in blocksMap";
-        if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
-          long fileLength = file.computeContentSummary().getLength();
-          DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
-          // remove the replica locations of this block from the node
-          for (int i = 0; i < targets.length; i++) {
-            targets[i].removeBlock(lastBlock);
-          }
-          // convert last block to under-construction and set its locations
-          blockManager.convertLastBlockToUnderConstruction(file, targets);
-
-          lb = new LocatedBlock(getExtendedBlock(lastBlock), targets, 
-                                fileLength-lastBlock.getNumBytes());
-          if (isBlockTokenEnabled) {
-            lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(), 
-                EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
-          }
-
-          // Remove block from replication queue.
-          blockManager.updateNeededReplications(lastBlock, 0, 0);
-
-          // remove this block from the list of pending blocks to be deleted. 
-          // This reduces the possibility of triggering HADOOP-1349.
-          //
-          for (DatanodeDescriptor dd : targets) {
-            String datanodeId = dd.getStorageID();
-            blockManager.removeFromInvalidates(datanodeId, lastBlock);
-          }
-        }
-      }
-    }
     if (lb != null) {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
@@ -1466,7 +1557,8 @@ public class FSNamesystem implements FSC
           +src+" for "+clientName);
     }
 
-    synchronized (this) {
+    writeLock();
+    try {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot add block to " + src, safeMode);
       }
@@ -1490,6 +1582,8 @@ public class FSNamesystem implements FSC
       blockSize = pendingFile.getPreferredBlockSize();
       clientNode = pendingFile.getClientNode();
       replication = (int)pendingFile.getReplication();
+    } finally {
+      writeUnlock();
     }
 
     // choose targets for the new block to be allocated.
@@ -1502,7 +1596,8 @@ public class FSNamesystem implements FSC
     }
 
     // Allocate a new block and record it in the INode. 
-    synchronized (this) {
+    writeLock();
+    try {
       INode[] pathINodes = dir.getExistingPathINodes(src);
       int inodesLen = pathINodes.length;
       checkLease(src, clientName, pathINodes[inodesLen-1]);
@@ -1519,6 +1614,8 @@ public class FSNamesystem implements FSC
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
       }      
+    } finally {
+      writeUnlock();
     }
         
     // Create next block
@@ -1533,10 +1630,11 @@ public class FSNamesystem implements FSC
   /**
    * The client would like to let go of the given block
    */
-  public synchronized boolean abandonBlock(ExtendedBlock b, String src, String holder)
+  public boolean abandonBlock(ExtendedBlock b, String src, String holder)
       throws LeaseExpiredException, FileNotFoundException,
       UnresolvedLinkException, IOException {
-    checkBlock(b);
+    writeLock();
+    try {
     //
     // Remove the block from the pending creates list
     //
@@ -1552,6 +1650,9 @@ public class FSNamesystem implements FSC
                                     + " is removed from pendingCreates");
     }
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
   
   // make sure that we still have the lease on this file.
@@ -1603,9 +1704,11 @@ public class FSNamesystem implements FSC
     return success ;
   }
 
-  private synchronized boolean completeFileInternal(String src, 
+  private boolean completeFileInternal(String src, 
       String holder, Block last) throws SafeModeException,
       UnresolvedLinkException, IOException {
+    writeLock();
+    try {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
           src + " for " + holder);
@@ -1626,6 +1729,9 @@ public class FSNamesystem implements FSC
     NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
                                   + " is closed by " + holder);
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
 
   /** 
@@ -1670,7 +1776,9 @@ public class FSNamesystem implements FSC
    * replicated.  If not, return false. If checkall is true, then check
    * all blocks, otherwise check only penultimate block.
    */
-  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+  boolean checkFileProgress(INodeFile v, boolean checkall) {
+    writeLock();
+    try {
     if (checkall) {
       //
       // check all blocks of the file.
@@ -1696,6 +1804,9 @@ public class FSNamesystem implements FSC
       }
     }
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
 
 
@@ -1704,10 +1815,14 @@ public class FSNamesystem implements FSC
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
    */
-  public synchronized void markBlockAsCorrupt(ExtendedBlock blk, DatanodeInfo dn)
+  public void markBlockAsCorrupt(ExtendedBlock blk, DatanodeInfo dn)
     throws IOException {
-    checkBlock(blk);
-    blockManager.findAndMarkBlockAsCorrupt(ExtendedBlock.getLocalBlock(blk), dn);
+    writeLock();
+    try {
+      blockManager.findAndMarkBlockAsCorrupt(blk.getLocalBlock(), dn);
+    } finally {
+      writeUnlock();
+    }
   }
 
 
@@ -1742,8 +1857,11 @@ public class FSNamesystem implements FSC
 
   /** @deprecated See {@link #renameTo(String, String)} */
   @Deprecated
-  private synchronized boolean renameToInternal(String src, String dst)
+  private boolean renameToInternal(String src, String dst)
     throws IOException, UnresolvedLinkException {
+
+    writeLock();
+    try {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
@@ -1769,6 +1887,9 @@ public class FSNamesystem implements FSC
       return true;
     }
     return false;
+    } finally {
+      writeUnlock();
+    }
   }
   
 
@@ -1788,8 +1909,10 @@ public class FSNamesystem implements FSC
     }
   }
 
-  private synchronized void renameToInternal(String src, String dst,
+  private void renameToInternal(String src, String dst,
       Options.Rename... options) throws IOException {
+    writeLock();
+    try {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
@@ -1808,6 +1931,9 @@ public class FSNamesystem implements FSC
     HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     dir.renameTo(src, dst, options);
     changeLease(src, dst, dinfo); // update lease with new filename
+    } finally {
+      writeUnlock();
+    }
   }
   
   /**
@@ -1850,7 +1976,9 @@ public class FSNamesystem implements FSC
       UnresolvedLinkException, IOException{
     boolean deleteNow = false;
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
-    synchronized(this) {
+
+    writeLock();
+    try {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot delete " + src, safeMode);
       }
@@ -1865,6 +1993,8 @@ public class FSNamesystem implements FSC
       if (deleteNow) { // Perform small deletes right away
         removeBlocks(collectedBlocks);
       }
+    } finally {
+      writeUnlock();
     }
     // Log directory deletion to editlog
     getEditLog().logSync();
@@ -1886,10 +2016,13 @@ public class FSNamesystem implements FSC
     while (start < blocks.size()) {
       end = BLOCK_DELETION_INCREMENT + start;
       end = end > blocks.size() ? blocks.size() : end;
-      synchronized(this) {
+      writeLock();
+      try {
         for (int i=start; i<end; i++) {
           blockManager.removeBlock(blocks.get(i));
         }
+      } finally {
+        writeUnlock();
       }
       start = end;
     }
@@ -1946,9 +2079,11 @@ public class FSNamesystem implements FSC
   /**
    * Create all the necessary directories
    */
-  private synchronized boolean mkdirsInternal(String src,
+  private boolean mkdirsInternal(String src,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    writeLock();
+    try {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
@@ -1982,6 +2117,9 @@ public class FSNamesystem implements FSC
       throw new IOException("Failed to create directory: " + src);
     }
     return true;
+    } finally {
+      writeUnlock();
+    }
   }
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
@@ -2018,12 +2156,15 @@ public class FSNamesystem implements FSC
 
     NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
                                   + src + " for " + clientName);
-    synchronized (this) {
+    writeLock();
+    try {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot fsync file " + src, safeMode);
       }
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
       dir.persistBlocks(src, pendingFile);
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -2190,10 +2331,13 @@ public class FSNamesystem implements FSC
     checkReplicationFactor(newFile);
   }
 
-  synchronized void commitBlockSynchronization(ExtendedBlock lastblock,
+  void commitBlockSynchronization(ExtendedBlock lastblock,
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
       throws IOException, UnresolvedLinkException {
+    String src = "";
+    writeLock();
+    try {
     LOG.info("commitBlockSynchronization(lastblock=" + lastblock
           + ", newgenerationstamp=" + newgenerationstamp
           + ", newlength=" + newlength
@@ -2256,7 +2400,7 @@ public class FSNamesystem implements FSC
 
     // If this commit does not want to close the file, persist
     // blocks only if append is supported and return
-    String src = leaseManager.findPath(pendingFile);
+    src = leaseManager.findPath(pendingFile);
     if (!closeFile) {
       if (supportAppends) {
         dir.persistBlocks(src, pendingFile);
@@ -2271,6 +2415,9 @@ public class FSNamesystem implements FSC
 
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     LOG.info("commitBlockSynchronization(newblock=" + lastblock
           + ", file=" + src
@@ -2348,8 +2495,10 @@ public class FSNamesystem implements FSC
    * 
    * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
    */
-  public synchronized void registerDatanode(DatanodeRegistration nodeReg
+  public void registerDatanode(DatanodeRegistration nodeReg
                                             ) throws IOException {
+    writeLock();
+    try {
     String dnAddress = Server.getRemoteAddress();
     if (dnAddress == null) {
       // Mostly called inside an RPC.
@@ -2463,6 +2612,9 @@ public class FSNamesystem implements FSC
       // because its is done when the descriptor is created
     }
     return;
+    } finally {
+      writeUnlock();
+    }
   }
     
   /* Resolve a node's network location */
@@ -2721,11 +2873,13 @@ public class FSNamesystem implements FSC
     workFound = blockManager.computeReplicationWork(blocksToProcess);
     
     // Update FSNamesystemMetrics counters
-    synchronized (this) {
+    writeLock();
+    try {
       blockManager.updateState();
       blockManager.scheduledReplicationBlocksCount = workFound;
+    } finally {
+      writeUnlock();
     }
-    
     workFound += blockManager.computeInvalidateWork(nodesToProcess);
     return workFound;
   }
@@ -2738,8 +2892,10 @@ public class FSNamesystem implements FSC
    * remove a datanode descriptor
    * @param nodeID datanode ID
    */
-  synchronized public void removeDatanode(DatanodeID nodeID) 
+  public void removeDatanode(DatanodeID nodeID) 
     throws IOException {
+    writeLock();
+    try {
     DatanodeDescriptor nodeInfo = getDatanode(nodeID);
     if (nodeInfo != null) {
       removeDatanode(nodeInfo);
@@ -2747,6 +2903,9 @@ public class FSNamesystem implements FSC
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
                                    + nodeID.getName() + " does not exist");
     }
+    } finally {
+      writeUnlock();
+    }
   }
   
   /**
@@ -2849,7 +3008,8 @@ public class FSNamesystem implements FSC
 
       // acquire the fsnamesystem lock, and then remove the dead node.
       if (foundDead) {
-        synchronized (this) {
+        writeLock();
+        try {
           synchronized(heartbeats) {
             synchronized (datanodeMap) {
               DatanodeDescriptor nodeInfo = null;
@@ -2865,6 +3025,8 @@ public class FSNamesystem implements FSC
               }
             }
           }
+        } finally {
+          writeUnlock();
         }
       }
       allAlive = !foundDead;
@@ -2875,9 +3037,11 @@ public class FSNamesystem implements FSC
    * The given node is reporting all its blocks.  Use this info to 
    * update the (machine-->blocklist) and (block-->machinelist) tables.
    */
-  public synchronized void processReport(DatanodeID nodeID, String poolId,
+  public void processReport(DatanodeID nodeID, String poolId,
       BlockListAsLongs newReport) throws IOException {
-    checkPoolId(poolId);
+
+    writeLock();
+    try {
     long startTime = now();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
@@ -2898,6 +3062,9 @@ public class FSNamesystem implements FSC
     
     blockManager.processReport(node, newReport);
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -3005,12 +3172,13 @@ public class FSNamesystem implements FSC
   /**
    * The given node is reporting that it received a certain block.
    */
-  public synchronized void blockReceived(DatanodeID nodeID,  
+  public void blockReceived(DatanodeID nodeID,  
                                          String poolId,
                                          Block block,
                                          String delHint
                                          ) throws IOException {
-    checkPoolId(poolId);
+    writeLock();
+    try {
     DatanodeDescriptor node = getDatanode(nodeID);
     if (node == null || !node.isAlive) {
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
@@ -3031,6 +3199,9 @@ public class FSNamesystem implements FSC
     }
 
     blockManager.addBlock(node, block, delHint);
+    } finally {
+      writeUnlock();
+    }
   }
 
   private void checkPoolId(String thatPoolId) throws IOException {
@@ -3130,16 +3301,17 @@ public class FSNamesystem implements FSC
     return getDatanodeListForReport(type).size(); 
   }
 
-  private synchronized ArrayList<DatanodeDescriptor> getDatanodeListForReport(
-                                                      DatanodeReportType type) {                  
-    
+  private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
+                                                      DatanodeReportType type) {
     boolean listLiveNodes = type == DatanodeReportType.ALL ||
                             type == DatanodeReportType.LIVE;
     boolean listDeadNodes = type == DatanodeReportType.ALL ||
                             type == DatanodeReportType.DEAD;
 
     HashMap<String, String> mustList = new HashMap<String, String>();
-    
+
+    readLock();
+    try {
     if (listDeadNodes) {
       //first load all the nodes listed in include and exclude files.
       for (Iterator<String> it = hostsReader.getHosts().iterator(); 
@@ -3182,10 +3354,15 @@ public class FSNamesystem implements FSC
     }
     
     return nodes;
+    } finally {
+      readUnlock();
+    }
   }
 
-  public synchronized DatanodeInfo[] datanodeReport( DatanodeReportType type
+  public DatanodeInfo[] datanodeReport( DatanodeReportType type
       ) throws AccessControlException {
+    readLock();
+    try {
     checkSuperuserPrivilege();
 
     ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
@@ -3194,6 +3371,9 @@ public class FSNamesystem implements FSC
       arr[i] = new DatanodeInfo(results.get(i));
     }
     return arr;
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -3204,7 +3384,9 @@ public class FSNamesystem implements FSC
    * @throws AccessControlException if superuser privilege is violated.
    * @throws IOException if 
    */
-  synchronized void saveNamespace() throws AccessControlException, IOException {
+  void saveNamespace() throws AccessControlException, IOException {
+    writeLock();
+    try {
     checkSuperuserPrivilege();
     if(!isInSafeMode()) {
       throw new IOException("Safe mode should be turned ON " +
@@ -3212,6 +3394,9 @@ public class FSNamesystem implements FSC
     }
     getFSImage().saveNamespace(true);
     LOG.info("New namespace image has been created.");
+    } finally {
+      writeUnlock();
+    }
   }
   
   /**
@@ -3220,7 +3405,9 @@ public class FSNamesystem implements FSC
    * 
    * @throws AccessControlException if superuser privilege is violated.
    */
-  synchronized boolean restoreFailedStorage(String arg) throws AccessControlException {
+  boolean restoreFailedStorage(String arg) throws AccessControlException {
+    writeLock();
+    try {
     checkSuperuserPrivilege();
     
     // if it is disabled - enable it and vice versa.
@@ -3231,13 +3418,17 @@ public class FSNamesystem implements FSC
     getFSImage().setRestoreFailedStorage(val);
     
     return val;
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    */
-  public synchronized void DFSNodesStatus(ArrayList<DatanodeDescriptor> live, 
+  public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live, 
                                           ArrayList<DatanodeDescriptor> dead) {
-
+    readLock();
+    try {
     ArrayList<DatanodeDescriptor> results = 
                             getDatanodeListForReport(DatanodeReportType.ALL);    
     for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
@@ -3247,12 +3438,17 @@ public class FSNamesystem implements FSC
       else
         live.add(node);
     }
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
    * Prints information about all datanodes.
    */
-  private synchronized void datanodeDump(PrintWriter out) {
+  private void datanodeDump(PrintWriter out) {
+    readLock();
+    try {
     synchronized (datanodeMap) {
       out.println("Metasave: Number of datanodes: " + datanodeMap.size());
       for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
@@ -3260,6 +3456,9 @@ public class FSNamesystem implements FSC
         out.println(node.dumpDatanode());
       }
     }
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -3402,7 +3601,8 @@ public class FSNamesystem implements FSC
     hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                 conf.get("dfs.hosts.exclude", ""));
     hostsReader.refresh();
-    synchronized (this) {
+    writeLock();
+    try {
       for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
            it.hasNext();) {
         DatanodeDescriptor node = it.next();
@@ -3423,8 +3623,9 @@ public class FSNamesystem implements FSC
           }
         }
       }
-    } 
-      
+    } finally {
+      writeUnlock();
+    }
   }
     
   void finalizeUpgrade() throws IOException {
@@ -3440,8 +3641,9 @@ public class FSNamesystem implements FSC
    * Returns TRUE if node is registered (including when it is on the 
    * exclude list and is being decommissioned). 
    */
-  private synchronized boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) 
+  private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) 
     throws IOException {
+    assert (hasWriteLock());
     if (!inHostsList(nodeReg, ipAddr)) {
       return false;    
     }
@@ -3939,7 +4141,9 @@ public class FSNamesystem implements FSC
    * Enter safe mode manually.
    * @throws IOException
    */
-  synchronized void enterSafeMode() throws IOException {
+  void enterSafeMode() throws IOException {
+    writeLock();
+    try {
     // Ensure that any concurrent operations have been fully synced
     // before entering safe mode. This ensures that the FSImage
     // is entirely stable on disk as soon as we're in safe mode.
@@ -3951,13 +4155,18 @@ public class FSNamesystem implements FSC
     safeMode.setManual();
     NameNode.stateChangeLog.info("STATE* Safe mode is ON. " 
                                 + safeMode.getTurnOffTip());
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Leave safe mode.
    * @throws IOException
    */
-  synchronized void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
+  void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
+    writeLock();
+    try {
     if (!isInSafeMode()) {
       NameNode.stateChangeLog.info("STATE* Safe mode is already OFF."); 
       return;
@@ -3966,50 +4175,78 @@ public class FSNamesystem implements FSC
       throw new SafeModeException("Distributed upgrade is in progress",
                                   safeMode);
     safeMode.leave(checkForUpgrades);
+    } finally {
+      writeUnlock();
+    }
   }
     
-  synchronized String getSafeModeTip() {
+  String getSafeModeTip() {
+    readLock();
+    try {
     if (!isInSafeMode())
       return "";
     return safeMode.getTurnOffTip();
+    } finally {
+      readUnlock();
+    }
   }
 
   long getEditLogSize() throws IOException {
     return getEditLog().getEditLogSize();
   }
 
-  synchronized CheckpointSignature rollEditLog() throws IOException {
+  CheckpointSignature rollEditLog() throws IOException {
+    writeLock();
+    try {
     if (isInSafeMode()) {
       throw new SafeModeException("Checkpoint not created",
                                   safeMode);
     }
     LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
     return getFSImage().rollEditLog();
+    } finally {
+      writeUnlock();
+    }
   }
 
-  synchronized void rollFSImage() throws IOException {
+  void rollFSImage() throws IOException {
+    writeLock();
+    try {
     if (isInSafeMode()) {
       throw new SafeModeException("Checkpoint not created",
                                   safeMode);
     }
     LOG.info("Roll FSImage from " + Server.getRemoteAddress());
     getFSImage().rollFSImage();
+    } finally {
+      writeUnlock();
+    }
   }
 
-  synchronized NamenodeCommand startCheckpoint(
+  NamenodeCommand startCheckpoint(
                                 NamenodeRegistration bnReg, // backup node
                                 NamenodeRegistration nnReg) // active name-node
   throws IOException {
+    writeLock();
+    try {
     LOG.info("Start checkpoint for " + bnReg.getAddress());
     NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
     getEditLog().logSync();
     return cmd;
+    } finally {
+      writeUnlock();
+    }
   }
 
-  synchronized void endCheckpoint(NamenodeRegistration registration,
+  void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
+    writeLock();
+    try {
     LOG.info("End checkpoint for " + registration.getAddress());
     getFSImage().endCheckpoint(sig, registration.getRole());
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -4098,8 +4335,13 @@ public class FSNamesystem implements FSC
         fsOwner.getShortUserName(), supergroup);
     if (!pc.isSuper) {
       dir.waitForReady();
-      pc.checkPermission(path, dir.rootDir, doCheckOwner,
-          ancestorAccess, parentAccess, access, subAccess);
+      readLock();
+      try {
+        pc.checkPermission(path, dir.rootDir, doCheckOwner,
+            ancestorAccess, parentAccess, access, subAccess);
+      } finally {
+        readUnlock();
+      } 
     }
     return pc;
   }
@@ -4301,9 +4543,10 @@ public class FSNamesystem implements FSC
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
-  synchronized LocatedBlock updateBlockForPipeline(ExtendedBlock block, 
+  LocatedBlock updateBlockForPipeline(ExtendedBlock block, 
       String clientName) throws IOException {
-    checkBlock(block);
+    writeLock();
+    try {
     // check vadility of parameters
     checkUCBlock(block, clientName);
     
@@ -4315,6 +4558,9 @@ public class FSNamesystem implements FSC
           block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
     }
     return locatedBlock;
+    } finally {
+      writeUnlock();
+    }
   }
   
   
@@ -4327,12 +4573,11 @@ public class FSNamesystem implements FSC
    * @param newNodes datanodes in the pipeline
    * @throws IOException if any error occurs
    */
-  synchronized void updatePipeline(String clientName, ExtendedBlock oldBlock,
+  void updatePipeline(String clientName, ExtendedBlock oldBlock, 
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
-    checkBlock(oldBlock);
-    checkBlock(newBlock);
-    
+    writeLock();
+    try {
     assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
     + oldBlock + " has different block identifier";
     LOG.info("updatePipeline(block=" + oldBlock
@@ -4379,6 +4624,9 @@ public class FSNamesystem implements FSC
     }
     LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
     return;
+    } finally {
+      writeUnlock();
+    }
   }
 
   // rename was successful. If any part of the renamed subtree had
@@ -4447,8 +4695,10 @@ public class FSNamesystem implements FSC
    * @param registration
    * @throws IOException
    */
-  synchronized void registerBackupNode(NamenodeRegistration registration)
+  void registerBackupNode(NamenodeRegistration registration)
   throws IOException {
+    writeLock();
+    try {
     if(getFSImage().getNamespaceID() != registration.getNamespaceID())
       throw new IOException("Incompatible namespaceIDs: " 
           + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
@@ -4458,6 +4708,9 @@ public class FSNamesystem implements FSC
     if(!regAllowed)
       throw new IOException("Registration is not allowed. " +
           "Another node is registered as a backup.");
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -4467,14 +4720,19 @@ public class FSNamesystem implements FSC
    * @param registration
    * @throws IOException
    */
-  synchronized void releaseBackupNode(NamenodeRegistration registration)
+  void releaseBackupNode(NamenodeRegistration registration)
   throws IOException {
+    writeLock();
+    try {
     if(getFSImage().getNamespaceID() != registration.getNamespaceID())
       throw new IOException("Incompatible namespaceIDs: " 
           + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
           + "; " + registration.getRole() +
               " node namespaceID = " + registration.getNamespaceID());
     getEditLog().releaseBackupStream(registration);
+    } finally {
+      writeUnlock();
+    }
   }
 
   public int numCorruptReplicas(Block blk) {
@@ -4527,9 +4785,11 @@ public class FSNamesystem implements FSC
    * @throws AccessControlException
    * @throws IOException
    */
-  synchronized Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
+  Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
       String startBlockAfter) throws AccessControlException, IOException {
 
+    readLock();
+    try {
     checkSuperuserPrivilege();
     long startBlockId = 0;
     // print a limited # of corrupt files per call
@@ -4556,9 +4816,14 @@ public class FSNamesystem implements FSC
     }
     LOG.info("list corrupt file blocks returned: " + count);
     return corruptFiles;
+    } finally {
+      readUnlock();
+    }
   }
   
-  public synchronized ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+  public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+    readLock();
+    try {
     ArrayList<DatanodeDescriptor> decommissioningNodes = 
         new ArrayList<DatanodeDescriptor>();
     ArrayList<DatanodeDescriptor> results = 
@@ -4570,6 +4835,9 @@ public class FSNamesystem implements FSC
       }
     }
     return decommissioningNodes;
+    } finally {
+      readUnlock();
+    }
   }
 
   /*
@@ -4698,8 +4966,11 @@ public class FSNamesystem implements FSC
    */
   private void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) throws IOException {
-    synchronized (this) {
+    writeLock();
+    try {
       getEditLog().logGetDelegationToken(id, expiryTime);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
   }
@@ -4712,8 +4983,11 @@ public class FSNamesystem implements FSC
    */
   private void logRenewDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) throws IOException {
-    synchronized (this) {
+    writeLock();
+    try {
       getEditLog().logRenewDelegationToken(id, expiryTime);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
   }
@@ -4726,8 +5000,11 @@ public class FSNamesystem implements FSC
    */
   private void logCancelDelegationToken(DelegationTokenIdentifier id)
       throws IOException {
-    synchronized (this) {
+    writeLock();
+    try {
       getEditLog().logCancelDelegationToken(id);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
   }
@@ -4738,8 +5015,11 @@ public class FSNamesystem implements FSC
    * @param key new delegation key.
    */
   public void logUpdateMasterKey(DelegationKey key) throws IOException {
-    synchronized (this) {
+    writeLock();
+    try {
       getEditLog().logUpdateMasterKey(key);
+    } finally {
+      writeUnlock();
     }
     getEditLog().logSync();
   }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Fri Sep 17 19:51:27 2010
@@ -120,7 +120,6 @@ class FSPermissionChecker {
           + ", subAccess=" + subAccess);
     }
     // check if (parentAccess != null) && file exists, then check sb
-    synchronized(root) {
       // Resolve symlinks, the check is performed on the link target.
       INode[] inodes = root.getExistingPathINodes(path, true);
       int ancestorIndex = inodes.length - 2;
@@ -147,7 +146,6 @@ class FSPermissionChecker {
       if (doCheckOwner) {
         checkOwner(inodes[inodes.length - 1]);
       }
-    }
   }
 
   private void checkOwner(INode inode) throws AccessControlException {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Fri Sep 17 19:51:27 2010
@@ -120,9 +120,9 @@ public class FileDataServlet extends Dfs
               response.getWriter().println(e.toString());
             }
           } else if (info == null) {
-            response.sendError(400, "cat: File not found " + path);
+            response.sendError(400, "File not found " + path);
           } else {
-            response.sendError(400, "cat: " + path + ": is a directory");
+            response.sendError(400, path + ": is a directory");
           }
           return null;
         }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Fri Sep 17 19:51:27 2010
@@ -366,12 +366,16 @@ public class LeaseManager {
     /** Check leases periodically. */
     public void run() {
       for(; fsnamesystem.isRunning(); ) {
-        synchronized(fsnamesystem) {
+        fsnamesystem.writeLock();
+        try {
           if (!fsnamesystem.isInSafeMode()) {
             checkLeases();
           }
+        } finally {
+          fsnamesystem.writeUnlock();
         }
 
+
         try {
           Thread.sleep(2000);
         } catch(InterruptedException ie) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java Fri Sep 17 19:51:27 2010
@@ -131,9 +131,11 @@ public class ListPathsServlet extends Df
     throws ServletException, IOException {
     final PrintWriter out = response.getWriter();
     final XMLOutputter doc = new XMLOutputter(out, "UTF-8");
+
+    final Map<String, String> root = buildRoot(request, doc);
+    final String path = root.get("path");
+
     try {
-      final Map<String, String> root = buildRoot(request, doc);
-      final String path = root.get("path");
       final boolean recur = "yes".equals(root.get("recursive"));
       final Pattern filter = Pattern.compile(root.get("filter"));
       final Pattern exclude = Pattern.compile(root.get("exclude"));
@@ -191,14 +193,18 @@ public class ListPathsServlet extends Df
               writeXml(re, p, doc);
             }
           }
-          doc.endDocument();
           return null;
         }
       });
+    } catch(IOException ioe) {
+      writeXml(ioe, path, doc);
     } catch (InterruptedException e) {
       LOG.warn("ListPathServlet encountered InterruptedException", e);
       response.sendError(400, e.getMessage());
     } finally {
+      if (doc != null) {
+        doc.endDocument();
+      }
       if (out != null) {
         out.close();
       }

Propchange: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/test/hdfs:776175-785643
 /hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
-/hadoop/hdfs/trunk/src/test/hdfs:987665-997035
+/hadoop/hdfs/trunk/src/test/hdfs:987665-998256

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Sep 17 19:51:27 2010
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.channels.FileChannel;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Random;
@@ -56,6 +57,7 @@ import org.apache.hadoop.net.DNSToSwitch
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.StaticMapping;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.util.StringUtils;
@@ -879,6 +881,22 @@ public class MiniDFSCluster {
   }
 
   /**
+   *  @return a {@link HftpFileSystem} object as specified user. 
+   */
+  public HftpFileSystem getHftpFileSystemAs(final String username,
+      final Configuration conf, final String... groups
+      ) throws IOException, InterruptedException {
+    final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+        username, groups);
+    return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() {
+      @Override
+      public HftpFileSystem run() throws Exception {
+        return getHftpFileSystem();
+      }
+    });
+  }
+
+  /**
    * Get the directories where the namenode stores its image.
    */
   public Collection<URI> getNameDirs() {

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Sep 17 19:51:27 2010
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -179,90 +182,6 @@ public class TestDistributedFileSystem {
   }
   
   @Test
-  public void testFileChecksum() throws IOException {
-    ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
-
-    final long seed = RAN.nextLong();
-    System.out.println("seed=" + seed);
-    RAN.setSeed(seed);
-
-    final Configuration conf = getTestConfiguration();
-    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
-
-    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
-    final FileSystem hdfs = cluster.getFileSystem();
-    final String hftpuri = "hftp://" + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
-    System.out.println("hftpuri=" + hftpuri);
-    final FileSystem hftp = new Path(hftpuri).getFileSystem(conf);
-
-    final String dir = "/filechecksum";
-    final int block_size = 1024;
-    final int buffer_size = conf.getInt("io.file.buffer.size", 4096);
-    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
-
-    //try different number of blocks
-    for(int n = 0; n < 5; n++) {
-      //generate random data
-      final byte[] data = new byte[RAN.nextInt(block_size/2-1)+n*block_size+1];
-      RAN.nextBytes(data);
-      System.out.println("data.length=" + data.length);
-  
-      //write data to a file
-      final Path foo = new Path(dir, "foo" + n);
-      {
-        final FSDataOutputStream out = hdfs.create(foo, false, buffer_size,
-            (short)2, block_size);
-        out.write(data);
-        out.close();
-      }
-      
-      //compute checksum
-      final FileChecksum hdfsfoocs = hdfs.getFileChecksum(foo);
-      System.out.println("hdfsfoocs=" + hdfsfoocs);
-      
-      final FileChecksum hftpfoocs = hftp.getFileChecksum(foo);
-      System.out.println("hftpfoocs=" + hftpfoocs);
-
-      final Path qualified = new Path(hftpuri + dir, "foo" + n);
-      final FileChecksum qfoocs = hftp.getFileChecksum(qualified);
-      System.out.println("qfoocs=" + qfoocs);
-
-      //write another file
-      final Path bar = new Path(dir, "bar" + n);
-      {
-        final FSDataOutputStream out = hdfs.create(bar, false, buffer_size,
-            (short)2, block_size);
-        out.write(data);
-        out.close();
-      }
-  
-      { //verify checksum
-        final FileChecksum barcs = hdfs.getFileChecksum(bar);
-        final int barhashcode = barcs.hashCode();
-        assertEquals(hdfsfoocs.hashCode(), barhashcode);
-        assertEquals(hdfsfoocs, barcs);
-
-        assertEquals(hftpfoocs.hashCode(), barhashcode);
-        assertEquals(hftpfoocs, barcs);
-
-        assertEquals(qfoocs.hashCode(), barhashcode);
-        assertEquals(qfoocs, barcs);
-      }
-    }
-    cluster.shutdown();
-  }
-  
-  @Test
-  public void testAllWithDualPort() throws Exception {
-    dualPortTesting = true;
-
-    testFileSystemCloseAll();
-    testDFSClose();
-    testDFSClient();
-    testFileChecksum();
-  }
-  
-  @Test
   public void testStatistics() throws Exception {
     int lsLimit = 2;
     final Configuration conf = getTestConfiguration();
@@ -359,4 +278,100 @@ public class TestDistributedFileSystem {
     assertEquals(writeOps, DFSTestUtil.getStatistics(fs).getWriteOps());
     assertEquals(largeReadOps, DFSTestUtil.getStatistics(fs).getLargeReadOps());
   }
+
+  @Test
+  public void testFileChecksum() throws Exception {
+    ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
+
+    final long seed = RAN.nextLong();
+    System.out.println("seed=" + seed);
+    RAN.setSeed(seed);
+
+    final Configuration conf = getTestConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
+
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    final FileSystem hdfs = cluster.getFileSystem();
+    final String hftpuri = "hftp://" + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+    System.out.println("hftpuri=" + hftpuri);
+    final FileSystem hftp = new Path(hftpuri).getFileSystem(conf);
+
+    final String dir = "/filechecksum";
+    final int block_size = 1024;
+    final int buffer_size = conf.getInt("io.file.buffer.size", 4096);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
+
+    //try different number of blocks
+    for(int n = 0; n < 5; n++) {
+      //generate random data
+      final byte[] data = new byte[RAN.nextInt(block_size/2-1)+n*block_size+1];
+      RAN.nextBytes(data);
+      System.out.println("data.length=" + data.length);
+  
+      //write data to a file
+      final Path foo = new Path(dir, "foo" + n);
+      {
+        final FSDataOutputStream out = hdfs.create(foo, false, buffer_size,
+            (short)2, block_size);
+        out.write(data);
+        out.close();
+      }
+      
+      //compute checksum
+      final FileChecksum hdfsfoocs = hdfs.getFileChecksum(foo);
+      System.out.println("hdfsfoocs=" + hdfsfoocs);
+      
+      final FileChecksum hftpfoocs = hftp.getFileChecksum(foo);
+      System.out.println("hftpfoocs=" + hftpfoocs);
+
+      final Path qualified = new Path(hftpuri + dir, "foo" + n);
+      final FileChecksum qfoocs = hftp.getFileChecksum(qualified);
+      System.out.println("qfoocs=" + qfoocs);
+
+      //write another file
+      final Path bar = new Path(dir, "bar" + n);
+      {
+        final FSDataOutputStream out = hdfs.create(bar, false, buffer_size,
+            (short)2, block_size);
+        out.write(data);
+        out.close();
+      }
+  
+      { //verify checksum
+        final FileChecksum barcs = hdfs.getFileChecksum(bar);
+        final int barhashcode = barcs.hashCode();
+        assertEquals(hdfsfoocs.hashCode(), barhashcode);
+        assertEquals(hdfsfoocs, barcs);
+
+        assertEquals(hftpfoocs.hashCode(), barhashcode);
+        assertEquals(hftpfoocs, barcs);
+
+        assertEquals(qfoocs.hashCode(), barhashcode);
+        assertEquals(qfoocs, barcs);
+      }
+
+      { //test permission error on hftp 
+        hdfs.setPermission(new Path(dir), new FsPermission((short)0));
+        try {
+          final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
+          final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+          hftp2.getFileChecksum(qualified);
+          fail();
+        } catch(IOException ioe) {
+          FileSystem.LOG.info("GOOD: getting an exception", ioe);
+        }
+      }
+    }
+    cluster.shutdown();
+  }
+  
+  @Test
+  public void testAllWithDualPort() throws Exception {
+    dualPortTesting = true;
+
+    testFileSystemCloseAll();
+    testDFSClose();
+    testDFSClient();
+    testFileChecksum();
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java Fri Sep 17 19:51:27 2010
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Random;
@@ -29,17 +34,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
-
-import static org.junit.Assert.*;
-import org.junit.Test;
-import org.junit.BeforeClass;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
  * This class tests the FileStatus API.
@@ -184,7 +189,7 @@ public class TestFileStatus {
 
   /** Test FileStatus objects obtained from a directory */
   @Test
-  public void testGetFileStatusOnDir() throws IOException {
+  public void testGetFileStatusOnDir() throws Exception {
     // Create the directory
     Path dir = new Path("/test/mkdirs");
     assertTrue("mkdir failed", fs.mkdirs(dir));
@@ -284,5 +289,17 @@ public class TestFileStatus {
     assertEquals(file2.toString(), itor.next().getPath().toString());
     assertEquals(file3.toString(), itor.next().getPath().toString());
     assertFalse(itor.hasNext());      
+
+    { //test permission error on hftp 
+      fs.setPermission(dir, new FsPermission((short)0));
+      try {
+        final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
+        final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+        hftp2.getContentSummary(dir);
+        fail();
+      } catch(IOException ioe) {
+        FileSystem.LOG.info("GOOD: getting an exception", ioe);
+      }
+    }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java Fri Sep 17 19:51:27 2010
@@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -100,6 +102,29 @@ public class TestListPathServlet {
     // Non existent path
     checkStatus("/nonexistent");
     checkStatus("/nonexistent/a");
+
+    final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
+    final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, CONF, "somegroup");
+    { //test file not found on hftp 
+      final Path nonexistent = new Path("/nonexistent");
+      try {
+        hftp2.getFileStatus(nonexistent);
+        Assert.fail();
+      } catch(IOException ioe) {
+        FileSystem.LOG.info("GOOD: getting an exception", ioe);
+      }
+    }
+
+    { //test permission error on hftp
+      final Path dir = new Path("/dir");
+      fs.setPermission(dir, new FsPermission((short)0));
+      try {
+        hftp2.getFileStatus(new Path(dir, "a"));
+        Assert.fail();
+      } catch(IOException ioe) {
+        FileSystem.LOG.info("GOOD: getting an exception", ioe);
+      }
+    }
   }
 
   private void checkStatus(String listdir) throws IOException {

Propchange: hadoop/hdfs/branches/HDFS-1052/src/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/webapps/datanode:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/datanode:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/webapps/datanode:820487
-/hadoop/hdfs/trunk/src/webapps/datanode:987665-997035
+/hadoop/hdfs/trunk/src/webapps/datanode:987665-998256

Propchange: hadoop/hdfs/branches/HDFS-1052/src/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/webapps/hdfs:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs:820487
-/hadoop/hdfs/trunk/src/webapps/hdfs:987665-997035
+/hadoop/hdfs/trunk/src/webapps/hdfs:987665-998256

Propchange: hadoop/hdfs/branches/HDFS-1052/src/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/webapps/secondary:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/secondary:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/webapps/secondary:820487
-/hadoop/hdfs/trunk/src/webapps/secondary:987665-997035
+/hadoop/hdfs/trunk/src/webapps/secondary:987665-998256



Mime
View raw message