hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [11/50] [abbrv] hadoop git commit: HADOOP-10682. Replace FsDatasetImpl object lock with a separate lock object. (Chen Liang)
Date Tue, 09 Aug 2016 22:33:53 GMT
HADOOP-10682. Replace FsDatasetImpl object lock with a separate lock object. (Chen Liang)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8c063847
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8c063847
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8c063847

Branch: refs/heads/YARN-3368
Commit: 8c0638471f8f1dd47667b2d6727d4d2d54e4b48c
Parents: 6255859
Author: Arpit Agarwal <arp@apache.org>
Authored: Mon Aug 8 12:02:53 2016 -0700
Committer: Arpit Agarwal <arp@apache.org>
Committed: Mon Aug 8 12:02:53 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/datanode/DataNode.java   |   3 +-
 .../hdfs/server/datanode/DirectoryScanner.java  |   3 +-
 .../hdfs/server/datanode/DiskBalancer.java      |   3 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   6 +
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 927 ++++++++++---------
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   9 +-
 .../server/datanode/SimulatedFSDataset.java     |   9 +
 .../hdfs/server/datanode/TestBlockRecovery.java |   3 +-
 .../server/datanode/TestDirectoryScanner.java   |   9 +-
 .../extdataset/ExternalDatasetImpl.java         |   5 +
 10 files changed, 533 insertions(+), 444 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index ac55397..e502f1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -115,6 +115,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.net.DomainPeerServer;
@@ -2877,7 +2878,7 @@ public class DataNode extends ReconfigurableBase
     final BlockConstructionStage stage;
 
     //get replica information
-    synchronized(data) {
+    try(AutoCloseableLock lock = data.acquireDatasetLock()) {
       Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
           b.getBlockId());
       if (null == storedBlock) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 1db445e..7c8985e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -44,6 +44,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -583,7 +584,7 @@ public class DirectoryScanner implements Runnable {
     Map<String, ScanInfo[]> diskReport = getDiskReport();
 
     // Hold FSDataset lock to prevent further changes to the block map
-    synchronized(dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
         String bpid = entry.getKey();
         ScanInfo[] blockpoolReport = entry.getValue();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index c73b10f..1f61861 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
@@ -454,7 +455,7 @@ public class DiskBalancer {
     Map<String, FsVolumeSpi> pathMap = new HashMap<>();
     FsDatasetSpi.FsVolumeReferences references;
     try {
-      synchronized (this.dataset) {
+      try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
         references = this.dataset.getFsVolumeReferences();
         for (int ndx = 0; ndx < references.size(); ndx++) {
           FsVolumeSpi vol = references.get(ndx);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index eeab098..acc269a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -639,4 +640,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
       FsVolumeSpi destination) throws IOException;
+
+  /**
+   * Acquire the lock of the data set.
+   */
+  AutoCloseableLock acquireDatasetLock();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index c0f2fbd..c4d924e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -175,21 +176,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override
-  public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
-    final ReplicaInfo r =  volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
-    return r != null? (FsVolumeImpl)r.getVolume(): null;
+  public FsVolumeImpl getVolume(final ExtendedBlock b) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      final ReplicaInfo r =
+          volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
+      return r != null ? (FsVolumeImpl) r.getVolume() : null;
+    }
   }
 
   @Override // FsDatasetSpi
-  public synchronized Block getStoredBlock(String bpid, long blkid)
+  public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    File blockfile = getFile(bpid, blkid, false);
-    if (blockfile == null) {
-      return null;
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      File blockfile = getFile(bpid, blkid, false);
+      if (blockfile == null) {
+        return null;
+      }
+      final File metafile = FsDatasetUtil.findMetaFile(blockfile);
+      final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
+      return new Block(blkid, blockfile.length(), gs);
     }
-    final File metafile = FsDatasetUtil.findMetaFile(blockfile);
-    final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
-    return new Block(blkid, blockfile.length(), gs);
   }
 
 
@@ -258,6 +264,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   private boolean blockPinningEnabled;
   private final int maxDataLength;
+
+  private final AutoCloseableLock datasetLock;
   
   /**
    * An FSDataset has a directory where it loads its data files.
@@ -269,6 +277,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
+    this.datasetLock = new AutoCloseableLock();
     // The number of volumes required for operation is the total number
     // of volumes minus the number of failed volumes we can tolerate.
     volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
@@ -341,6 +350,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
   }
 
+  @Override
+  public AutoCloseableLock acquireDatasetLock() {
+    return datasetLock.acquire();
+  }
+
   /**
    * Gets initial volume failure information for all volumes that failed
    * immediately at startup.  The method works by determining the set difference
@@ -375,25 +389,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Activate a volume to serve requests.
    * @throws IOException if the storage UUID already exists.
    */
-  private synchronized void activateVolume(
+  private void activateVolume(
       ReplicaMap replicaMap,
       Storage.StorageDirectory sd, StorageType storageType,
       FsVolumeReference ref) throws IOException {
-    DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
-    if (dnStorage != null) {
-      final String errorMsg = String.format(
-          "Found duplicated storage UUID: %s in %s.",
-          sd.getStorageUuid(), sd.getVersionFile());
-      LOG.error(errorMsg);
-      throw new IOException(errorMsg);
-    }
-    volumeMap.addAll(replicaMap);
-    storageMap.put(sd.getStorageUuid(),
-        new DatanodeStorage(sd.getStorageUuid(),
-            DatanodeStorage.State.NORMAL,
-            storageType));
-    asyncDiskService.addVolume(sd.getCurrentDir());
-    volumes.addVolume(ref);
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
+      if (dnStorage != null) {
+        final String errorMsg = String.format(
+            "Found duplicated storage UUID: %s in %s.",
+            sd.getStorageUuid(), sd.getVersionFile());
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+      volumeMap.addAll(replicaMap);
+      storageMap.put(sd.getStorageUuid(),
+          new DatanodeStorage(sd.getStorageUuid(),
+              DatanodeStorage.State.NORMAL,
+              storageType));
+      asyncDiskService.addVolume(sd.getCurrentDir());
+      volumes.addVolume(ref);
+    }
   }
 
   private void addVolume(Collection<StorageLocation> dataLocations,
@@ -488,7 +504,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
     List<String> storageToRemove = new ArrayList<>();
-    synchronized (this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
         Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
         final File absRoot = sd.getRoot().getAbsoluteFile();
@@ -534,7 +550,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
 
-    synchronized (this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       for(String storageUuid : storageToRemove) {
         storageMap.remove(storageUuid);
       }
@@ -743,7 +759,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                                          boolean touch)
       throws IOException {
     final File f;
-    synchronized(this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
     }
     if (f == null) {
@@ -809,22 +825,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Returns handles to the block file and its metadata file
    */
   @Override // FsDatasetSpi
-  public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
+  public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    ReplicaInfo info = getReplicaInfo(b);
-    FsVolumeReference ref = info.getVolume().obtainReference();
-    try {
-      InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      ReplicaInfo info = getReplicaInfo(b);
+      FsVolumeReference ref = info.getVolume().obtainReference();
       try {
-        InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset);
-        return new ReplicaInputStreams(blockInStream, metaInStream, ref);
+        InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
+        try {
+          InputStream metaInStream =
+              openAndSeek(info.getMetaFile(), metaOffset);
+          return new ReplicaInputStreams(blockInStream, metaInStream, ref);
+        } catch (IOException e) {
+          IOUtils.cleanup(null, blockInStream);
+          throw e;
+        }
       } catch (IOException e) {
-        IOUtils.cleanup(null, blockInStream);
+        IOUtils.cleanup(null, ref);
         throw e;
       }
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
     }
   }
 
@@ -943,7 +962,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     FsVolumeReference volumeRef = null;
-    synchronized (this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
     }
     try {
@@ -985,7 +1004,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     newReplicaInfo.setNumBytes(blockFiles[1].length());
     // Finalize the copied files
     newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
-    synchronized (this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       // Increment numBlocks here as this block moved without knowing to BPS
       FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
       volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
@@ -1015,7 +1034,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeReference volumeRef = null;
 
-    synchronized (this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       volumeRef = destination.obtainReference();
     }
 
@@ -1143,41 +1162,44 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
 
   @Override  // FsDatasetSpi
-  public synchronized ReplicaHandler append(ExtendedBlock b,
+  public ReplicaHandler append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    // If the block was successfully finalized because all packets
-    // were successfully processed at the Datanode but the ack for
-    // some of the packets were not received by the client. The client 
-    // re-opens the connection and retries sending those packets.
-    // The other reason is that an "append" is occurring to this block.
-    
-    // check the validity of the parameter
-    if (newGS < b.getGenerationStamp()) {
-      throw new IOException("The new generation stamp " + newGS + 
-          " should be greater than the replica " + b + "'s generation stamp");
-    }
-    ReplicaInfo replicaInfo = getReplicaInfo(b);
-    LOG.info("Appending to " + replicaInfo);
-    if (replicaInfo.getState() != ReplicaState.FINALIZED) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
-    }
-    if (replicaInfo.getNumBytes() != expectedBlockLen) {
-      throw new IOException("Corrupted replica " + replicaInfo + 
-          " with a length of " + replicaInfo.getNumBytes() + 
-          " expected length is " + expectedBlockLen);
-    }
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      // If the block was successfully finalized because all packets
+      // were successfully processed at the Datanode but the ack for
+      // some of the packets were not received by the client. The client
+      // re-opens the connection and retries sending those packets.
+      // The other reason is that an "append" is occurring to this block.
+
+      // check the validity of the parameter
+      if (newGS < b.getGenerationStamp()) {
+        throw new IOException("The new generation stamp " + newGS +
+            " should be greater than the replica " + b + "'s generation stamp");
+      }
+      ReplicaInfo replicaInfo = getReplicaInfo(b);
+      LOG.info("Appending to " + replicaInfo);
+      if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+        throw new ReplicaNotFoundException(
+            ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+      }
+      if (replicaInfo.getNumBytes() != expectedBlockLen) {
+        throw new IOException("Corrupted replica " + replicaInfo +
+            " with a length of " + replicaInfo.getNumBytes() +
+            " expected length is " + expectedBlockLen);
+      }
 
-    FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
-    ReplicaBeingWritten replica = null;
-    try {
-      replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
-          b.getNumBytes());
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
+      FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
+      ReplicaBeingWritten replica = null;
+      try {
+        replica = append(b.getBlockPoolId(),
+            (FinalizedReplica) replicaInfo, newGS,
+            b.getNumBytes());
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
+      return new ReplicaHandler(replica, ref);
     }
-    return new ReplicaHandler(replica, ref);
   }
   
   /** Append to a finalized replica
@@ -1192,66 +1214,68 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @throws IOException if moving the replica from finalized directory 
    *         to rbw directory fails
    */
-  private synchronized ReplicaBeingWritten append(String bpid,
+  private ReplicaBeingWritten append(String bpid,
       FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
-    // If the block is cached, start uncaching it.
-    cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
-
-    // If there are any hardlinks to the block, break them.  This ensures we are
-    // not appending to a file that is part of a previous/ directory.
-    replicaInfo.breakHardLinksIfNeeded();
-    
-    // construct a RBW replica with the new GS
-    File blkfile = replicaInfo.getBlockFile();
-    FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
-    long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
-    if (v.getAvailable() < bytesReserved) {
-      throw new DiskOutOfSpaceException("Insufficient space for appending to "
-          + replicaInfo);
-    }
-    File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
-    File oldmeta = replicaInfo.getMetaFile();
-    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
-        replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
-        v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
-    File newmeta = newReplicaInfo.getMetaFile();
-
-    // rename meta file to rbw directory
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-    }
-    try {
-      NativeIO.renameTo(oldmeta, newmeta);
-    } catch (IOException e) {
-      throw new IOException("Block " + replicaInfo + " reopen failed. " +
-                            " Unable to move meta file  " + oldmeta +
-                            " to rbw dir " + newmeta, e);
-    }
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      // If the block is cached, start uncaching it.
+      cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
+
+      // If there are any hardlinks to the block, break them.  This ensures we
+      // are not appending to a file that is part of a previous/ directory.
+      replicaInfo.breakHardLinksIfNeeded();
+
+      // construct a RBW replica with the new GS
+      File blkfile = replicaInfo.getBlockFile();
+      FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
+      long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
+      if (v.getAvailable() < bytesReserved) {
+        throw new DiskOutOfSpaceException("Insufficient space for appending to "
+            + replicaInfo);
+      }
+      File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
+      File oldmeta = replicaInfo.getMetaFile();
+      ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+          replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
+          v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
+      File newmeta = newReplicaInfo.getMetaFile();
+
+      // rename meta file to rbw directory
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+      }
+      try {
+        NativeIO.renameTo(oldmeta, newmeta);
+      } catch (IOException e) {
+        throw new IOException("Block " + replicaInfo + " reopen failed. " +
+            " Unable to move meta file  " + oldmeta +
+            " to rbw dir " + newmeta, e);
+      }
 
-    // rename block file to rbw directory
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + blkfile + " to " + newBlkFile
-          + ", file length=" + blkfile.length());
-    }
-    try {
-      NativeIO.renameTo(blkfile, newBlkFile);
-    } catch (IOException e) {
+      // rename block file to rbw directory
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+            + ", file length=" + blkfile.length());
+      }
       try {
-        NativeIO.renameTo(newmeta, oldmeta);
-      } catch (IOException ex) {
-        LOG.warn("Cannot move meta file " + newmeta + 
-            "back to the finalized directory " + oldmeta, ex);
+        NativeIO.renameTo(blkfile, newBlkFile);
+      } catch (IOException e) {
+        try {
+          NativeIO.renameTo(newmeta, oldmeta);
+        } catch (IOException ex) {
+          LOG.warn("Cannot move meta file " + newmeta +
+              "back to the finalized directory " + oldmeta, ex);
+        }
+        throw new IOException("Block " + replicaInfo + " reopen failed. " +
+            " Unable to move block file " + blkfile +
+            " to rbw dir " + newBlkFile, e);
       }
-      throw new IOException("Block " + replicaInfo + " reopen failed. " +
-                              " Unable to move block file " + blkfile +
-                              " to rbw dir " + newBlkFile, e);
+
+      // Replace finalized replica by a RBW replica in replicas map
+      volumeMap.add(bpid, newReplicaInfo);
+      v.reserveSpaceForReplica(bytesReserved);
+      return newReplicaInfo;
     }
-    
-    // Replace finalized replica by a RBW replica in replicas map
-    volumeMap.add(bpid, newReplicaInfo);
-    v.reserveSpaceForReplica(bytesReserved);
-    return newReplicaInfo;
   }
 
   private static class MustStopExistingWriter extends Exception {
@@ -1321,7 +1345,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        synchronized (this) {
+        try (AutoCloseableLock lock = datasetLock.acquire()) {
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
 
           FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
@@ -1353,7 +1377,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     LOG.info("Recover failed close " + b);
     while (true) {
       try {
-        synchronized (this) {
+        try (AutoCloseableLock lock = datasetLock.acquire()) {
           // check replica's state
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           // bump the replica's GS
@@ -1400,62 +1424,65 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaHandler createRbw(
+  public ReplicaHandler createRbw(
       StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
       throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
-        b.getBlockId());
-    if (replicaInfo != null) {
-      throw new ReplicaAlreadyExistsException("Block " + b +
-      " already exists in state " + replicaInfo.getState() +
-      " and thus cannot be created.");
-    }
-    // create a new block
-    FsVolumeReference ref = null;
-
-    // Use ramdisk only if block size is a multiple of OS page size.
-    // This simplifies reservation for partially used replicas
-    // significantly.
-    if (allowLazyPersist &&
-        lazyWriter != null &&
-        b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
-        reserveLockedMemory(b.getNumBytes())) {
-      try {
-        // First try to place the block on a transient volume.
-        ref = volumes.getNextTransientVolume(b.getNumBytes());
-        datanode.getMetrics().incrRamDiskBlocksWrite();
-      } catch(DiskOutOfSpaceException de) {
-        // Ignore the exception since we just fall back to persistent storage.
-      } finally {
-        if (ref == null) {
-          cacheManager.release(b.getNumBytes());
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
+          b.getBlockId());
+      if (replicaInfo != null) {
+        throw new ReplicaAlreadyExistsException("Block " + b +
+            " already exists in state " + replicaInfo.getState() +
+            " and thus cannot be created.");
+      }
+      // create a new block
+      FsVolumeReference ref = null;
+
+      // Use ramdisk only if block size is a multiple of OS page size.
+      // This simplifies reservation for partially used replicas
+      // significantly.
+      if (allowLazyPersist &&
+          lazyWriter != null &&
+          b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
+          reserveLockedMemory(b.getNumBytes())) {
+        try {
+          // First try to place the block on a transient volume.
+          ref = volumes.getNextTransientVolume(b.getNumBytes());
+          datanode.getMetrics().incrRamDiskBlocksWrite();
+        } catch (DiskOutOfSpaceException de) {
+          // Ignore the exception since we just fall back to persistent storage.
+        } finally {
+          if (ref == null) {
+            cacheManager.release(b.getNumBytes());
+          }
         }
       }
-    }
 
-    if (ref == null) {
-      ref = volumes.getNextVolume(storageType, b.getNumBytes());
-    }
+      if (ref == null) {
+        ref = volumes.getNextVolume(storageType, b.getNumBytes());
+      }
 
-    FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
-    // create an rbw file to hold block in the designated volume
+      FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+      // create an rbw file to hold block in the designated volume
 
-    if (allowLazyPersist && !v.isTransientStorage()) {
-      datanode.getMetrics().incrRamDiskBlocksWriteFallback();
-    }
+      if (allowLazyPersist && !v.isTransientStorage()) {
+        datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+      }
 
-    File f;
-    try {
-      f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
-    }
+      File f;
+      try {
+        f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
 
-    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
-        b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
-    volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-    return new ReplicaHandler(newReplicaInfo, ref);
+      ReplicaBeingWritten newReplicaInfo =
+          new ReplicaBeingWritten(b.getBlockId(),
+          b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
+      volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+      return new ReplicaHandler(newReplicaInfo, ref);
+    }
   }
 
   @Override // FsDatasetSpi
@@ -1466,7 +1493,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        synchronized (this) {
+        try (AutoCloseableLock lock = datasetLock.acquire()) {
           ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
           
           // check the replica's state
@@ -1487,61 +1514,64 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
+  private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    // check generation stamp
-    long replicaGenerationStamp = rbw.getGenerationStamp();
-    if (replicaGenerationStamp < b.getGenerationStamp() ||
-        replicaGenerationStamp > newGS) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
-          ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
-          newGS + "].");
-    }
-    
-    // check replica length
-    long bytesAcked = rbw.getBytesAcked();
-    long numBytes = rbw.getNumBytes();
-    if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
-      throw new ReplicaNotFoundException("Unmatched length replica " + 
-          rbw + ": BytesAcked = " + bytesAcked + 
-          " BytesRcvd = " + numBytes + " are not in the range of [" + 
-          minBytesRcvd + ", " + maxBytesRcvd + "].");
-    }
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      // check generation stamp
+      long replicaGenerationStamp = rbw.getGenerationStamp();
+      if (replicaGenerationStamp < b.getGenerationStamp() ||
+          replicaGenerationStamp > newGS) {
+        throw new ReplicaNotFoundException(
+            ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+                ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+                newGS + "].");
+      }
 
-    FsVolumeReference ref = rbw.getVolume().obtainReference();
-    try {
-      // Truncate the potentially corrupt portion.
-      // If the source was client and the last node in the pipeline was lost,
-      // any corrupt data written after the acked length can go unnoticed.
-      if (numBytes > bytesAcked) {
-        final File replicafile = rbw.getBlockFile();
-        truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
-        rbw.setNumBytes(bytesAcked);
-        rbw.setLastChecksumAndDataLen(bytesAcked, null);
-      }
-
-      // bump the replica's generation stamp to newGS
-      bumpReplicaGS(rbw, newGS);
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
+      // check replica length
+      long bytesAcked = rbw.getBytesAcked();
+      long numBytes = rbw.getNumBytes();
+      if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd) {
+        throw new ReplicaNotFoundException("Unmatched length replica " +
+            rbw + ": BytesAcked = " + bytesAcked +
+            " BytesRcvd = " + numBytes + " are not in the range of [" +
+            minBytesRcvd + ", " + maxBytesRcvd + "].");
+      }
+
+      FsVolumeReference ref = rbw.getVolume().obtainReference();
+      try {
+        // Truncate the potentially corrupt portion.
+        // If the source was client and the last node in the pipeline was lost,
+        // any corrupt data written after the acked length can go unnoticed.
+        if (numBytes > bytesAcked) {
+          final File replicafile = rbw.getBlockFile();
+          truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+          rbw.setNumBytes(bytesAcked);
+          rbw.setLastChecksumAndDataLen(bytesAcked, null);
+        }
+
+        // bump the replica's generation stamp to newGS
+        bumpReplicaGS(rbw, newGS);
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
+      return new ReplicaHandler(rbw, ref);
     }
-    return new ReplicaHandler(rbw, ref);
   }
   
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipeline convertTemporaryToRbw(
+  public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
-    final long blockId = b.getBlockId();
-    final long expectedGs = b.getGenerationStamp();
-    final long visible = b.getNumBytes();
-    LOG.info("Convert " + b + " from Temporary to RBW, visible length="
-        + visible);
-
-    final ReplicaInPipeline temp;
-    {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      final long blockId = b.getBlockId();
+      final long expectedGs = b.getGenerationStamp();
+      final long visible = b.getNumBytes();
+      LOG.info("Convert " + b + " from Temporary to RBW, visible length="
+          + visible);
+
+      final ReplicaInPipeline temp;
+
       // get replica
       final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
       if (r == null) {
@@ -1553,43 +1583,44 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         throw new ReplicaAlreadyExistsException(
             "r.getState() != ReplicaState.TEMPORARY, r=" + r);
       }
-      temp = (ReplicaInPipeline)r;
-    }
-    // check generation stamp
-    if (temp.getGenerationStamp() != expectedGs) {
-      throw new ReplicaAlreadyExistsException(
-          "temp.getGenerationStamp() != expectedGs = " + expectedGs
-          + ", temp=" + temp);
-    }
+      temp = (ReplicaInPipeline) r;
 
-    // TODO: check writer?
-    // set writer to the current thread
-    // temp.setWriter(Thread.currentThread());
+      // check generation stamp
+      if (temp.getGenerationStamp() != expectedGs) {
+        throw new ReplicaAlreadyExistsException(
+            "temp.getGenerationStamp() != expectedGs = " + expectedGs
+                + ", temp=" + temp);
+      }
 
-    // check length
-    final long numBytes = temp.getNumBytes();
-    if (numBytes < visible) {
-      throw new IOException(numBytes + " = numBytes < visible = "
-          + visible + ", temp=" + temp);
-    }
-    // check volume
-    final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume();
-    if (v == null) {
-      throw new IOException("r.getVolume() = null, temp="  + temp);
+      // TODO: check writer?
+      // set writer to the current thread
+      // temp.setWriter(Thread.currentThread());
+
+      // check length
+      final long numBytes = temp.getNumBytes();
+      if (numBytes < visible) {
+        throw new IOException(numBytes + " = numBytes < visible = "
+            + visible + ", temp=" + temp);
+      }
+      // check volume
+      final FsVolumeImpl v = (FsVolumeImpl) temp.getVolume();
+      if (v == null) {
+        throw new IOException("r.getVolume() = null, temp=" + temp);
+      }
+
+      // move block files to the rbw directory
+      BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
+      final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(),
+          bpslice.getRbwDir());
+      // create RBW
+      final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
+          blockId, numBytes, expectedGs,
+          v, dest.getParentFile(), Thread.currentThread(), 0);
+      rbw.setBytesAcked(visible);
+      // overwrite the RBW in the volume map
+      volumeMap.add(b.getBlockPoolId(), rbw);
+      return rbw;
     }
-    
-    // move block files to the rbw directory
-    BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
-    final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), 
-        bpslice.getRbwDir());
-    // create RBW
-    final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
-        blockId, numBytes, expectedGs,
-        v, dest.getParentFile(), Thread.currentThread(), 0);
-    rbw.setBytesAcked(visible);
-    // overwrite the RBW in the volume map
-    volumeMap.add(b.getBlockPoolId(), rbw);
-    return rbw;
   }
 
   @Override // FsDatasetSpi
@@ -1599,7 +1630,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
     ReplicaInfo lastFoundReplicaInfo = null;
     do {
-      synchronized (this) {
+      try (AutoCloseableLock lock = datasetLock.acquire()) {
         ReplicaInfo currentReplicaInfo =
             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
         if (currentReplicaInfo == lastFoundReplicaInfo) {
@@ -1678,72 +1709,82 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Complete the block write!
    */
   @Override // FsDatasetSpi
-  public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
-    if (Thread.interrupted()) {
-      // Don't allow data modifications from interrupted threads
-      throw new IOException("Cannot finalize block from Interrupted Thread");
-    }
-    ReplicaInfo replicaInfo = getReplicaInfo(b);
-    if (replicaInfo.getState() == ReplicaState.FINALIZED) {
-      // this is legal, when recovery happens on a file that has
-      // been opened for append but never modified
-      return;
+  public void finalizeBlock(ExtendedBlock b) throws IOException {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      if (Thread.interrupted()) {
+        // Don't allow data modifications from interrupted threads
+        throw new IOException("Cannot finalize block from Interrupted Thread");
+      }
+      ReplicaInfo replicaInfo = getReplicaInfo(b);
+      if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+        // this is legal, when recovery happens on a file that has
+        // been opened for append but never modified
+        return;
+      }
+      finalizeReplica(b.getBlockPoolId(), replicaInfo);
     }
-    finalizeReplica(b.getBlockPoolId(), replicaInfo);
   }
   
-  private synchronized FinalizedReplica finalizeReplica(String bpid,
+  private FinalizedReplica finalizeReplica(String bpid,
       ReplicaInfo replicaInfo) throws IOException {
-    FinalizedReplica newReplicaInfo = null;
-    if (replicaInfo.getState() == ReplicaState.RUR &&
-       ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() == 
-         ReplicaState.FINALIZED) {
-      newReplicaInfo = (FinalizedReplica)
-             ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
-    } else {
-      FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
-      File f = replicaInfo.getBlockFile();
-      if (v == null) {
-        throw new IOException("No volume for temporary file " + f + 
-            " for block " + replicaInfo);
-      }
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      FinalizedReplica newReplicaInfo = null;
+      if (replicaInfo.getState() == ReplicaState.RUR &&
+          ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
+              == ReplicaState.FINALIZED) {
+        newReplicaInfo = (FinalizedReplica)
+            ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica();
+      } else {
+        FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
+        File f = replicaInfo.getBlockFile();
+        if (v == null) {
+          throw new IOException("No volume for temporary file " + f +
+              " for block " + replicaInfo);
+        }
 
-      File dest = v.addFinalizedBlock(
-          bpid, replicaInfo, f, replicaInfo.getBytesReserved());
-      newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+        File dest = v.addFinalizedBlock(
+            bpid, replicaInfo, f, replicaInfo.getBytesReserved());
+        newReplicaInfo =
+            new FinalizedReplica(replicaInfo, v, dest.getParentFile());
 
-      if (v.isTransientStorage()) {
-        releaseLockedMemory(
-            replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(),
-            false);
-        ramDiskReplicaTracker.addReplica(
-            bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
-        datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
+        if (v.isTransientStorage()) {
+          releaseLockedMemory(
+              replicaInfo.getOriginalBytesReserved()
+                  - replicaInfo.getNumBytes(),
+              false);
+          ramDiskReplicaTracker.addReplica(
+              bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
+          datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
+        }
       }
-    }
-    volumeMap.add(bpid, newReplicaInfo);
+      volumeMap.add(bpid, newReplicaInfo);
 
-    return newReplicaInfo;
+      return newReplicaInfo;
+    }
   }
 
   /**
    * Remove the temporary block file (if any)
    */
   @Override // FsDatasetSpi
-  public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
-        b.getLocalBlock());
-    if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
-      // remove from volumeMap
-      volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
-
-      // delete the on-disk temp file
-      if (delBlockFromDisk(replicaInfo.getBlockFile(), 
-          replicaInfo.getMetaFile(), b.getLocalBlock())) {
-        LOG.warn("Block " + b + " unfinalized and removed. " );
-      }
-      if (replicaInfo.getVolume().isTransientStorage()) {
-        ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+  public void unfinalizeBlock(ExtendedBlock b) throws IOException {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
+          b.getLocalBlock());
+      if (replicaInfo != null
+          && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+        // remove from volumeMap
+        volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
+
+        // delete the on-disk temp file
+        if (delBlockFromDisk(replicaInfo.getBlockFile(),
+            replicaInfo.getMetaFile(), b.getLocalBlock())) {
+          LOG.warn("Block " + b + " unfinalized and removed. ");
+        }
+        if (replicaInfo.getVolume().isTransientStorage()) {
+          ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(),
+              b.getBlockId(), true);
+        }
       }
     }
   }
@@ -1791,7 +1832,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
     }
 
-    synchronized(this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
         switch(b.getState()) {
           case FINALIZED:
@@ -1824,31 +1865,36 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
   @Override
-  public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) {
-    ArrayList<FinalizedReplica> finalized =
-        new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
-    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
-      if(b.getState() == ReplicaState.FINALIZED) {
-        finalized.add(new FinalizedReplica((FinalizedReplica)b));
+  public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      ArrayList<FinalizedReplica> finalized =
+          new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+      for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+        if (b.getState() == ReplicaState.FINALIZED) {
+          finalized.add(new FinalizedReplica((FinalizedReplica) b));
+        }
       }
+      return finalized;
     }
-    return finalized;
   }
 
   /**
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
   @Override
-  public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
-    ArrayList<FinalizedReplica> finalized =
-        new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
-    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
-      if(!b.getVolume().isTransientStorage() &&
-         b.getState() == ReplicaState.FINALIZED) {
-        finalized.add(new FinalizedReplica((FinalizedReplica)b));
+  public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(
+      String bpid) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      ArrayList<FinalizedReplica> finalized =
+          new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+      for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+        if (!b.getVolume().isTransientStorage() &&
+            b.getState() == ReplicaState.FINALIZED) {
+          finalized.add(new FinalizedReplica((FinalizedReplica) b));
+        }
       }
+      return finalized;
     }
-    return finalized;
   }
 
   /**
@@ -1924,7 +1970,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   File validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final File f;
-    synchronized(this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       f = getFile(bpid, blockId, false);
     }
     
@@ -1973,7 +2019,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     for (int i = 0; i < invalidBlks.length; i++) {
       final File f;
       final FsVolumeImpl v;
-      synchronized (this) {
+      try (AutoCloseableLock lock = datasetLock.acquire()) {
         final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
         if (info == null) {
           // It is okay if the block is not found -- it may be deleted earlier.
@@ -2084,7 +2130,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     long length, genstamp;
     Executor volumeExecutor;
 
-    synchronized (this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo info = volumeMap.get(bpid, blockId);
       boolean success = false;
       try {
@@ -2151,9 +2197,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized boolean contains(final ExtendedBlock block) {
-    final long blockId = block.getLocalBlock().getBlockId();
-    return getFile(block.getBlockPoolId(), blockId, false) != null;
+  public boolean contains(final ExtendedBlock block) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      final long blockId = block.getLocalBlock().getBlockId();
+      return getFile(block.getBlockPoolId(), blockId, false) != null;
+    }
   }
 
   /**
@@ -2279,7 +2327,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       File diskMetaFile, FsVolumeSpi vol) throws IOException {
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
-    synchronized (this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       memBlockInfo = volumeMap.get(bpid, blockId);
       if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
@@ -2435,9 +2483,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override 
-  public synchronized String getReplicaString(String bpid, long blockId) {
-    final Replica r = volumeMap.get(bpid, blockId);
-    return r == null? "null": r.toString();
+  public String getReplicaString(String bpid, long blockId) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      final Replica r = volumeMap.get(bpid, blockId);
+      return r == null ? "null" : r.toString();
+    }
   }
 
   @Override // FsDatasetSpi
@@ -2530,67 +2580,69 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized Replica updateReplicaUnderRecovery(
+  public Replica updateReplicaUnderRecovery(
                                     final ExtendedBlock oldBlock,
                                     final long recoveryId,
                                     final long newBlockId,
                                     final long newlength) throws IOException {
-    //get replica
-    final String bpid = oldBlock.getBlockPoolId();
-    final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
-    LOG.info("updateReplica: " + oldBlock
-                 + ", recoveryId=" + recoveryId
-                 + ", length=" + newlength
-                 + ", replica=" + replica);
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      //get replica
+      final String bpid = oldBlock.getBlockPoolId();
+      final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
+      LOG.info("updateReplica: " + oldBlock
+          + ", recoveryId=" + recoveryId
+          + ", length=" + newlength
+          + ", replica=" + replica);
 
-    //check replica
-    if (replica == null) {
-      throw new ReplicaNotFoundException(oldBlock);
-    }
+      //check replica
+      if (replica == null) {
+        throw new ReplicaNotFoundException(oldBlock);
+      }
 
-    //check replica state
-    if (replica.getState() != ReplicaState.RUR) {
-      throw new IOException("replica.getState() != " + ReplicaState.RUR
-          + ", replica=" + replica);
-    }
+      //check replica state
+      if (replica.getState() != ReplicaState.RUR) {
+        throw new IOException("replica.getState() != " + ReplicaState.RUR
+            + ", replica=" + replica);
+      }
 
-    //check replica's byte on disk
-    if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
-      throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
-          + " replica.getBytesOnDisk() != block.getNumBytes(), block="
-          + oldBlock + ", replica=" + replica);
-    }
-
-    //check replica files before update
-    checkReplicaFiles(replica);
-
-    //update replica
-    final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
-        .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
-        newBlockId, newlength);
-
-    boolean copyTruncate = newBlockId != oldBlock.getBlockId();
-    if(!copyTruncate) {
-      assert finalized.getBlockId() == oldBlock.getBlockId()
-          && finalized.getGenerationStamp() == recoveryId
-          && finalized.getNumBytes() == newlength
-          : "Replica information mismatched: oldBlock=" + oldBlock
-              + ", recoveryId=" + recoveryId + ", newlength=" + newlength
-              + ", newBlockId=" + newBlockId + ", finalized=" + finalized;
-    } else {
-      assert finalized.getBlockId() == oldBlock.getBlockId()
-          && finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
-          && finalized.getNumBytes() == oldBlock.getNumBytes()
-          : "Finalized and old information mismatched: oldBlock=" + oldBlock
-              + ", genStamp=" + oldBlock.getGenerationStamp()
-              + ", len=" + oldBlock.getNumBytes()
-              + ", finalized=" + finalized;
-    }
+      //check replica's byte on disk
+      if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
+        throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+            + " replica.getBytesOnDisk() != block.getNumBytes(), block="
+            + oldBlock + ", replica=" + replica);
+      }
+
+      //check replica files before update
+      checkReplicaFiles(replica);
+
+      //update replica
+      final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
+              .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
+          newBlockId, newlength);
+
+      boolean copyTruncate = newBlockId != oldBlock.getBlockId();
+      if (!copyTruncate) {
+        assert finalized.getBlockId() == oldBlock.getBlockId()
+            && finalized.getGenerationStamp() == recoveryId
+            && finalized.getNumBytes() == newlength
+            : "Replica information mismatched: oldBlock=" + oldBlock
+            + ", recoveryId=" + recoveryId + ", newlength=" + newlength
+            + ", newBlockId=" + newBlockId + ", finalized=" + finalized;
+      } else {
+        assert finalized.getBlockId() == oldBlock.getBlockId()
+            && finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
+            && finalized.getNumBytes() == oldBlock.getNumBytes()
+            : "Finalized and old information mismatched: oldBlock=" + oldBlock
+            + ", genStamp=" + oldBlock.getGenerationStamp()
+            + ", len=" + oldBlock.getNumBytes()
+            + ", finalized=" + finalized;
+      }
 
-    //check replica files after update
-    checkReplicaFiles(finalized);
+      //check replica files after update
+      checkReplicaFiles(finalized);
 
-    return finalized;
+      return finalized;
+    }
   }
 
   private FinalizedReplica updateReplicaUnderRecovery(
@@ -2668,23 +2720,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
+  public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    final Replica replica = getReplicaInfo(block.getBlockPoolId(), 
-        block.getBlockId());
-    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
-      throw new IOException(
-          "replica.getGenerationStamp() < block.getGenerationStamp(), block="
-          + block + ", replica=" + replica);
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      final Replica replica = getReplicaInfo(block.getBlockPoolId(),
+          block.getBlockId());
+      if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+        throw new IOException(
+            "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+                + block + ", replica=" + replica);
+      }
+      return replica.getVisibleLength();
     }
-    return replica.getVisibleLength();
   }
   
   @Override
   public void addBlockPool(String bpid, Configuration conf)
       throws IOException {
     LOG.info("Adding block pool " + bpid);
-    synchronized(this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       volumes.addBlockPool(bpid, conf);
       volumeMap.initBlockPool(bpid);
     }
@@ -2692,11 +2746,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override
-  public synchronized void shutdownBlockPool(String bpid) {
-    LOG.info("Removing block pool " + bpid);
-    Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =  getBlockReports(bpid);
-    volumeMap.cleanUpBlockPool(bpid);
-    volumes.removeBlockPool(bpid, blocksPerVolume);
+  public void shutdownBlockPool(String bpid) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      LOG.info("Removing block pool " + bpid);
+      Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
+          = getBlockReports(bpid);
+      volumeMap.cleanUpBlockPool(bpid);
+      volumes.removeBlockPool(bpid, blocksPerVolume);
+    }
   }
   
   /**
@@ -2759,35 +2816,38 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override //FsDatasetSpi
-  public synchronized void deleteBlockPool(String bpid, boolean force)
+  public void deleteBlockPool(String bpid, boolean force)
       throws IOException {
-    List<FsVolumeImpl> curVolumes = volumes.getVolumes();
-    if (!force) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      List<FsVolumeImpl> curVolumes = volumes.getVolumes();
+      if (!force) {
+        for (FsVolumeImpl volume : curVolumes) {
+          try (FsVolumeReference ref = volume.obtainReference()) {
+            if (!volume.isBPDirEmpty(bpid)) {
+              LOG.warn(bpid
+                  + " has some block files, cannot delete unless forced");
+              throw new IOException("Cannot delete block pool, "
+                  + "it contains some block files");
+            }
+          } catch (ClosedChannelException e) {
+            // ignore.
+          }
+        }
+      }
       for (FsVolumeImpl volume : curVolumes) {
         try (FsVolumeReference ref = volume.obtainReference()) {
-          if (!volume.isBPDirEmpty(bpid)) {
-            LOG.warn(bpid + " has some block files, cannot delete unless forced");
-            throw new IOException("Cannot delete block pool, "
-                + "it contains some block files");
-          }
+          volume.deleteBPDirectories(bpid, force);
         } catch (ClosedChannelException e) {
           // ignore.
         }
       }
     }
-    for (FsVolumeImpl volume : curVolumes) {
-      try (FsVolumeReference ref = volume.obtainReference()) {
-        volume.deleteBPDirectories(bpid, force);
-      } catch (ClosedChannelException e) {
-        // ignore.
-      }
-    }
   }
   
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    synchronized(this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {
@@ -2838,7 +2898,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override
   public void onCompleteLazyPersist(String bpId, long blockId,
       long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
-    synchronized (FsDatasetImpl.this) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
 
       targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@@ -2972,7 +3032,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       try {
         block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
         if (block != null) {
-          synchronized (FsDatasetImpl.this) {
+          try (AutoCloseableLock lock = datasetLock.acquire()) {
             replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
 
             // If replicaInfo is null, the block was either deleted before
@@ -3042,7 +3102,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         long blockFileUsed, metaFileUsed;
         final String bpid = replicaState.getBlockPoolId();
 
-        synchronized (FsDatasetImpl.this) {
+        try (AutoCloseableLock lock = datasetLock.acquire()) {
           replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
                                        replicaState.getBlockId());
           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@@ -3219,14 +3279,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.timer = newTimer;
   }
 
-  synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
-    for (String blockPoolId : volumeMap.getBlockPoolList()) {
-      Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
-      for (ReplicaInfo replicaInfo : replicas) {
-        if (replicaInfo instanceof ReplicaInPipeline
-            && replicaInfo.getVolume().equals(volume)) {
-          ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
-          replicaInPipeline.interruptThread();
+  void stopAllDataxceiverThreads(FsVolumeImpl volume) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      for (String blockPoolId : volumeMap.getBlockPoolList()) {
+        Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+        for (ReplicaInfo replicaInfo : replicas) {
+          if (replicaInfo instanceof ReplicaInPipeline
+              && replicaInfo.getVolume().equals(volume)) {
+            ReplicaInPipeline replicaInPipeline
+                = (ReplicaInPipeline) replicaInfo;
+            replicaInPipeline.interruptThread();
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 4a446d4..59c9ed4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -304,7 +305,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
 
   private void decDfsUsedAndNumBlocks(String bpid, long value,
                                       boolean blockFileDeleted) {
-    synchronized(dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       BlockPoolSlice bp = bpSlices.get(bpid);
       if (bp != null) {
         bp.decDfsUsed(value);
@@ -316,7 +317,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   void incDfsUsedAndNumBlocks(String bpid, long value) {
-    synchronized (dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       BlockPoolSlice bp = bpSlices.get(bpid);
       if (bp != null) {
         bp.incDfsUsed(value);
@@ -326,7 +327,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   void incDfsUsed(String bpid, long value) {
-    synchronized(dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       BlockPoolSlice bp = bpSlices.get(bpid);
       if (bp != null) {
         bp.incDfsUsed(value);
@@ -337,7 +338,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   @VisibleForTesting
   public long getDfsUsed() throws IOException {
     long dfsUsed = 0;
-    synchronized(dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       for(BlockPoolSlice s : bpSlices.values()) {
         dfsUsed += s.getDfsUsed();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 0565260..c0c76ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -39,6 +39,7 @@ import javax.management.StandardMBean;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -115,6 +116,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       DatanodeStorage.State.NORMAL;
   
   static final byte[] nullCrcFileData;
+
+  private final AutoCloseableLock datasetLock;
+
   static {
     DataChecksum checksum = DataChecksum.newDataChecksum(
         DataChecksum.Type.NULL, 16*1024 );
@@ -550,6 +554,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
         conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
     this.volume = new SimulatedVolume(this.storage);
+    this.datasetLock = new AutoCloseableLock();
   }
 
   public synchronized void injectBlocks(String bpid,
@@ -1366,5 +1371,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     return null;
   }
 
+  @Override
+  public AutoCloseableLock acquireDatasetLock() {
+    return datasetLock.acquire();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 42e80fa..8183de8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -725,7 +726,7 @@ public class TestBlockRecovery {
             final RecoveringBlock recoveringBlock = new RecoveringBlock(
                 block.getBlock(), locations, block.getBlock()
                     .getGenerationStamp() + 1);
-            synchronized (dataNode.data) {
+            try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
               Thread.sleep(2000);
               dataNode.initReplicaRecovery(recoveringBlock);
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 48d4681..3822bad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -113,7 +114,7 @@ public class TestDirectoryScanner {
 
   /** Truncate a block file */
   private long truncateBlockFile() throws IOException {
-    synchronized (fds) {
+    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
@@ -138,7 +139,7 @@ public class TestDirectoryScanner {
 
   /** Delete a block file */
   private long deleteBlockFile() {
-    synchronized(fds) {
+    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
@@ -154,7 +155,7 @@ public class TestDirectoryScanner {
 
   /** Delete block meta file */
   private long deleteMetaFile() {
-    synchronized(fds) {
+    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File file = b.getMetaFile();
         // Delete a metadata file
@@ -173,7 +174,7 @@ public class TestDirectoryScanner {
    * @throws IOException
    */
   private void duplicateBlock(long blockId) throws IOException {
-    synchronized (fds) {
+    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
       try (FsDatasetSpi.FsVolumeReferences volumes =
           fds.getFsVolumeReferences()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c063847/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 8518ddd..b8564bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -23,6 +23,7 @@ import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -450,4 +451,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
     return null;
   }
 
+  @Override
+  public AutoCloseableLock acquireDatasetLock() {
+    return null;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message