hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kih...@apache.org
Subject hadoop git commit: HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by Rushabh Shah.
Date Fri, 18 Mar 2016 15:33:34 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 73b5a44b0 -> 242c7f1fe


HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by
Rushabh Shah.

(cherry picked from commit 63c966a3fbeb675959fc4101e65de9f57aecd17d)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/242c7f1f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/242c7f1f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/242c7f1f

Branch: refs/heads/branch-2.8
Commit: 242c7f1fee664b4d609a6c72e899f10816430f65
Parents: 73b5a44
Author: Kihwal Lee <kihwal@apache.org>
Authored: Fri Mar 18 10:33:13 2016 -0500
Committer: Kihwal Lee <kihwal@apache.org>
Committed: Fri Mar 18 10:33:13 2016 -0500

----------------------------------------------------------------------
 .../hdfs/server/datanode/ReplicaInPipeline.java |  7 +++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 13 ++++
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  6 ++
 .../fsdataset/impl/TestFsDatasetImpl.java       | 66 ++++++++++++++++++++
 4 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index d9406f0..5caca15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -183,6 +183,13 @@ public class ReplicaInPipeline extends ReplicaInfo
     this.writer = writer;
   }
   
+  public void interruptThread() {
+    if (writer != null && writer != Thread.currentThread() 
+        && writer.isAlive()) {
+      this.writer.interrupt();
+    }
+  }
+
   @Override  // Object
   public boolean equals(Object o) {
     return super.equals(o);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 163c8d0..2f16fc5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -3152,5 +3152,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     evictLazyPersistBlocks(bytesNeeded);
     return cacheManager.reserve(bytesNeeded) > 0;
   }
+
+  synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
+    for (String blockPoolId : volumeMap.getBlockPoolList()) {
+      Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+      for (ReplicaInfo replicaInfo : replicas) {
+        if (replicaInfo instanceof ReplicaInPipeline
+            && replicaInfo.getVolume().equals(volume)) {
+          ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
+          replicaInPipeline.interruptThread();
+        }
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index e02c293..ca7610d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -239,6 +239,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
     Preconditions.checkState(reference.getReferenceCount() > 0);
   }
 
+  @VisibleForTesting
+  int getReferenceCount() {
+    return this.reference.getReferenceCount();
+  }
+
   /**
    * Close this volume.
    * @throws IOException if the volume is closed.
@@ -246,6 +251,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   void setClosed() throws IOException {
     try {
       this.reference.setClosed();
+      dataset.stopAllDataxceiverThreads(this);
     } catch (ClosedChannelException e) {
       throw new IOException("The volume has already closed.", e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index ec526d3..67808bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -21,14 +21,19 @@ import com.google.common.collect.Lists;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -50,6 +55,7 @@ import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -525,4 +531,64 @@ public class TestFsDatasetImpl {
     LOG.info("Volumes removed");
     brReceivedLatch.await();
   }
+
+  /**
+   * Tests stopping all the active DataXceiver thread on volume failure event.
+   * @throws Exception
+   */
+  @Test
+  public void testCleanShutdownOfVolume() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration config = new HdfsConfiguration();
+      config.setLong(
+          DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
+      config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+
+      cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+      Path filePath = new Path("test.dat");
+      // Create a file and keep the output stream unclosed.
+      FSDataOutputStream out = fs.create(filePath, (short) 1);
+      out.write(1);
+      out.hflush();
+
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+      FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume(
+          block);
+      File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
+          .getBlockPoolId());
+
+      if (finalizedDir.exists()) {
+        // Remove write and execute access so that checkDiskErrorThread detects
+        // this volume is bad.
+        finalizedDir.setExecutable(false);
+        finalizedDir.setWritable(false);
+      }
+      Assert.assertTrue("Reference count for the volume should be greater "
+          + "than 0", volume.getReferenceCount() > 0);
+      // Invoke the synchronous checkDiskError method
+      dataNode.getFSDataset().checkDataDir();
+      // Sleep for 1 second so that datanode can interrupt and cluster clean up
+      Thread.sleep(1000);
+      assertEquals("There are active threads still referencing volume: "
+          + volume.getBasePath(), 0, volume.getReferenceCount());
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
+      DatanodeInfo info = lb.getLocations()[0];
+
+      try {
+        out.close();
+        Assert.fail("This is not a valid code path. "
+            + "out.close should have thrown an exception.");
+      } catch (IOException ioe) {
+        Assert.assertTrue(ioe.getMessage().contains(info.toString()));
+      }
+      finalizedDir.setWritable(true);
+      finalizedDir.setExecutable(true);
+    } finally {
+    cluster.shutdown();
+    }
+  }
 }


Mime
View raw message