hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject hadoop git commit: HDFS-9445. Datanode may deadlock while handling a bad volume. Contributed by Walter Su.
Date Mon, 04 Jan 2016 14:23:02 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 0381556a4 -> 236a6ba13


HDFS-9445. Datanode may deadlock while handling a bad volume. Contributed by Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/236a6ba1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/236a6ba1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/236a6ba1

Branch: refs/heads/branch-2.6
Commit: 236a6ba13ea8a8cc449058892254b4e0f6e8ec84
Parents: 0381556
Author: Junping Du <junping_du@apache.org>
Authored: Mon Jan 4 06:22:40 2016 -0800
Committer: Junping Du <junping_du@apache.org>
Committed: Mon Jan 4 06:27:52 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 95 +++++++++++---------
 .../fsdataset/impl/TestFsDatasetImpl.java       |  4 +
 3 files changed, 62 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/236a6ba1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5784814..68ba352 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -24,6 +24,9 @@ Release 2.6.4 - UNRELEASED
 
   BUG FIXES
 
+    HDFS-9445. Datanode may deadlock while handling a bad volume.
+    (Wlater Su via Kihwal)
+
 Release 2.6.3 - 2015-12-17
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/236a6ba1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e352ea3..7212432 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -385,51 +385,70 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   /**
    * Removes a collection of volumes from FsDataset.
    * @param volumes the root directories of the volumes.
-   *
-   * DataNode should call this function before calling
-   * {@link DataStorage#removeVolumes(java.util.Collection)}.
    */
   @Override
-  public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
+  public void removeVolumes(Collection<StorageLocation> volumes) {
     Set<String> volumeSet = new HashSet<String>();
     for (StorageLocation sl : volumes) {
       volumeSet.add(sl.getFile().getAbsolutePath());
     }
-    for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
-      Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
-      String volume = sd.getRoot().getAbsolutePath();
-      if (volumeSet.contains(volume)) {
-        LOG.info("Removing " + volume + " from FsDataset.");
-
-        // Disable the volume from the service.
-        asyncDiskService.removeVolume(sd.getCurrentDir());
-        this.volumes.removeVolume(sd.getRoot());
-
-        // Removed all replica information for the blocks on the volume. Unlike
-        // updating the volumeMap in addVolume(), this operation does not scan
-        // disks.
-        for (String bpid : volumeMap.getBlockPoolList()) {
-          List<Block> blocks = new ArrayList<Block>();
-          for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
-              it.hasNext(); ) {
-            ReplicaInfo block = it.next();
-            String absBasePath =
+
+    Map<String, List<ReplicaInfo>> blkToInvalidate =
+        new HashMap<String, List<ReplicaInfo>>();
+    List<String> storageToRemove = new ArrayList<String>();
+    synchronized (this) {
+      for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+        Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+        String volume = sd.getRoot().getAbsolutePath();
+        if (volumeSet.contains(volume)) {
+          LOG.info("Removing " + volume + " from FsDataset.");
+
+          // Disable the volume from the service.
+          asyncDiskService.removeVolume(sd.getCurrentDir());
+          this.volumes.removeVolume(sd.getRoot());
+
+          // Removed all replica information for the blocks on the volume.
+          // Unlike updating the volumeMap in addVolume(), this operation does
+          // not scan disks.
+          for (String bpid : volumeMap.getBlockPoolList()) {
+            List<ReplicaInfo> blocks = new ArrayList<ReplicaInfo>();
+            for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
+                 it.hasNext(); ) {
+              ReplicaInfo block = it.next();
+              String absBasePath =
                   new File(block.getVolume().getBasePath()).getAbsolutePath();
-            if (absBasePath.equals(volume)) {
-              invalidate(bpid, block);
-              blocks.add(block);
-              it.remove();
+              if (absBasePath.equals(volume)) {
+                blocks.add(block);
+                it.remove();
+              }
             }
+            blkToInvalidate.put(bpid, blocks);
+            // Delete blocks from the block scanner in batch.
+            datanode.getBlockScanner().deleteBlocks(bpid,
+                blocks.toArray(new Block[blocks.size()]));
           }
-          // Delete blocks from the block scanner in batch.
-          datanode.getBlockScanner().deleteBlocks(bpid,
-              blocks.toArray(new Block[blocks.size()]));
+
+          storageToRemove.add(sd.getStorageUuid());
         }
+      }
+      setupAsyncLazyPersistThreads();
+    }
 
-        storageMap.remove(sd.getStorageUuid());
+    // Call this outside the lock.
+    for (Map.Entry<String, List<ReplicaInfo>> entry :
+        blkToInvalidate.entrySet()) {
+      String bpid = entry.getKey();
+      List<ReplicaInfo> blocks = entry.getValue();
+      for (ReplicaInfo block : blocks) {
+        invalidate(bpid, block);
+      }
+    }
+
+    synchronized (this) {
+      for(String storageUuid : storageToRemove) {
+        storageMap.remove(storageUuid);
       }
     }
-    setupAsyncLazyPersistThreads();
   }
 
   private StorageType getStorageTypeFromLocations(
@@ -1639,15 +1658,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public void invalidate(String bpid, ReplicaInfo block) {
     // If a DFSClient has the replica in its cache of short-circuit file
     // descriptors (and the client is using ShortCircuitShm), invalidate it.
-    // The short-circuit registry is null in the unit tests, because the
-    // datanode is mock object.
-    if (datanode.getShortCircuitRegistry() != null) {
-      datanode.getShortCircuitRegistry().processBlockInvalidation(
-          new ExtendedBlockId(block.getBlockId(), bpid));
+    datanode.getShortCircuitRegistry().processBlockInvalidation(
+        new ExtendedBlockId(block.getBlockId(), bpid));
 
-      // If the block is cached, start uncaching it.
-      cacheManager.uncacheBlock(bpid, block.getBlockId());
-    }
+    // If the block is cached, start uncaching it.
+    cacheManager.uncacheBlock(bpid, block.getBlockId());
 
     datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
         block.getStorageUuid());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/236a6ba1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 0a9776d..2846c71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -127,6 +128,9 @@ public class TestFsDatasetImpl {
     when(datanode.getConf()).thenReturn(conf);
     when(datanode.getDnConf()).thenReturn(dnConf);
     when(datanode.getBlockScanner()).thenReturn(scanner);
+    final ShortCircuitRegistry shortCircuitRegistry =
+        new ShortCircuitRegistry(conf);
+    when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
 
     createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
     dataset = new FsDatasetImpl(datanode, storage, conf);


Mime
View raw message