Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6547718902 for ; Fri, 18 Mar 2016 15:33:35 +0000 (UTC) Received: (qmail 98191 invoked by uid 500); 18 Mar 2016 15:33:35 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 98128 invoked by uid 500); 18 Mar 2016 15:33:35 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 98119 invoked by uid 99); 18 Mar 2016 15:33:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 15:33:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 042E5DFA6C; Fri, 18 Mar 2016 15:33:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kihwal@apache.org To: common-commits@hadoop.apache.org Message-Id: <92e1e76b9c034eb69de3b4d3fc53ffbb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by Rushabh Shah. Date: Fri, 18 Mar 2016 15:33:34 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2.8 73b5a44b0 -> 242c7f1fe HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by Rushabh Shah. (cherry picked from commit 63c966a3fbeb675959fc4101e65de9f57aecd17d) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/242c7f1f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/242c7f1f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/242c7f1f Branch: refs/heads/branch-2.8 Commit: 242c7f1fee664b4d609a6c72e899f10816430f65 Parents: 73b5a44 Author: Kihwal Lee Authored: Fri Mar 18 10:33:13 2016 -0500 Committer: Kihwal Lee Committed: Fri Mar 18 10:33:13 2016 -0500 ---------------------------------------------------------------------- .../hdfs/server/datanode/ReplicaInPipeline.java | 7 +++ .../datanode/fsdataset/impl/FsDatasetImpl.java | 13 ++++ .../datanode/fsdataset/impl/FsVolumeImpl.java | 6 ++ .../fsdataset/impl/TestFsDatasetImpl.java | 66 ++++++++++++++++++++ 4 files changed, 92 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index d9406f0..5caca15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -183,6 +183,13 @@ public class ReplicaInPipeline extends ReplicaInfo this.writer = writer; } + public void interruptThread() { + if (writer != null && writer != Thread.currentThread() + && writer.isAlive()) { + this.writer.interrupt(); + } + } + @Override // Object public boolean equals(Object o) { return super.equals(o); http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 163c8d0..2f16fc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -3152,5 +3152,18 @@ class FsDatasetImpl implements FsDatasetSpi { evictLazyPersistBlocks(bytesNeeded); return cacheManager.reserve(bytesNeeded) > 0; } + + synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) { + for (String blockPoolId : volumeMap.getBlockPoolList()) { + Collection replicas = volumeMap.replicas(blockPoolId); + for (ReplicaInfo replicaInfo : replicas) { + if (replicaInfo instanceof ReplicaInPipeline + && replicaInfo.getVolume().equals(volume)) { + ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo; + replicaInPipeline.interruptThread(); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index e02c293..ca7610d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -239,6 +239,11 @@ public class FsVolumeImpl implements FsVolumeSpi { Preconditions.checkState(reference.getReferenceCount() > 0); } + @VisibleForTesting + int getReferenceCount() { + return this.reference.getReferenceCount(); + } + /** * Close this volume. * @throws IOException if the volume is closed. @@ -246,6 +251,7 @@ public class FsVolumeImpl implements FsVolumeSpi { void setClosed() throws IOException { try { this.reference.setClosed(); + dataset.stopAllDataxceiverThreads(this); } catch (ClosedChannelException e) { throw new IOException("The volume has already closed.", e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/242c7f1f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index ec526d3..67808bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -21,14 +21,19 @@ import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; @@ -50,6 +55,7 @@ import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; @@ -525,4 +531,64 @@ public class TestFsDatasetImpl { LOG.info("Volumes removed"); brReceivedLatch.await(); } + + /** + * Tests stopping all the active DataXceiver thread on volume failure event. + * @throws Exception + */ + @Test + public void testCleanShutdownOfVolume() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration config = new HdfsConfiguration(); + config.setLong( + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000); + config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + Path filePath = new Path("test.dat"); + // Create a file and keep the output stream unclosed. + FSDataOutputStream out = fs.create(filePath, (short) 1); + out.write(1); + out.hflush(); + + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); + FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume( + block); + File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem() + .getBlockPoolId()); + + if (finalizedDir.exists()) { + // Remove write and execute access so that checkDiskErrorThread detects + // this volume is bad. + finalizedDir.setExecutable(false); + finalizedDir.setWritable(false); + } + Assert.assertTrue("Reference count for the volume should be greater " + + "than 0", volume.getReferenceCount() > 0); + // Invoke the synchronous checkDiskError method + dataNode.getFSDataset().checkDataDir(); + // Sleep for 1 second so that datanode can interrupt and cluster clean up + Thread.sleep(1000); + assertEquals("There are active threads still referencing volume: " + + volume.getBasePath(), 0, volume.getReferenceCount()); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0); + DatanodeInfo info = lb.getLocations()[0]; + + try { + out.close(); + Assert.fail("This is not a valid code path. " + + "out.close should have thrown an exception."); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains(info.toString())); + } + finalizedDir.setWritable(true); + finalizedDir.setExecutable(true); + } finally { + cluster.shutdown(); + } + } }