Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6C0E3194C6 for ; Tue, 5 Apr 2016 01:10:27 +0000 (UTC) Received: (qmail 37308 invoked by uid 500); 5 Apr 2016 01:10:27 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 36969 invoked by uid 500); 5 Apr 2016 01:10:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 36960 invoked by uid 99); 5 Apr 2016 01:10:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Apr 2016 01:10:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 01669DFBED; Tue, 5 Apr 2016 01:10:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cmccabe@apache.org To: common-commits@hadoop.apache.org Message-Id: <0212570283cd4526af5ac7568a00e005@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8496. Calling stopWriter() with FSDatasetImpl lock held may block other threads (cmccabe) Date: Tue, 5 Apr 2016 01:10:27 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2.8 3574b4559 -> 63fe2ae72 HDFS-8496. Calling stopWriter() with FSDatasetImpl lock held may block other threads (cmccabe) (cherry picked from commit f6b1a818124cc42688c4c5acaf537d96cf00e43b) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (cherry picked from commit d55af8b5a356e88a12f0b343b3ed3a974140f81d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63fe2ae7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63fe2ae7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63fe2ae7 Branch: refs/heads/branch-2.8 Commit: 63fe2ae72c502b5190c8ba5183a88d046e66ad07 Parents: 3574b45 Author: Colin Patrick Mccabe Authored: Mon Apr 4 18:00:26 2016 -0700 Committer: Colin Patrick Mccabe Committed: Mon Apr 4 18:07:21 2016 -0700 ---------------------------------------------------------------------- .../hdfs/server/datanode/ReplicaInPipeline.java | 54 ++++--- .../datanode/fsdataset/impl/FsDatasetImpl.java | 145 +++++++++++++------ .../datanode/fsdataset/impl/ReplicaMap.java | 2 +- .../hdfs/server/datanode/TestBlockRecovery.java | 137 ++++++++++++++++-- 4 files changed, 257 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/63fe2ae7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 5caca15..7326846 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -22,6 +22,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.RandomAccessFile; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -44,7 +45,7 @@ public class ReplicaInPipeline extends ReplicaInfo private long bytesAcked; private long bytesOnDisk; private byte[] lastChecksum; - private Thread writer; + private AtomicReference writer = new AtomicReference(); /** * Bytes reserved for this replica on the containing volume. @@ -97,7 +98,7 @@ public class ReplicaInPipeline extends ReplicaInfo super( blockId, len, genStamp, vol, dir); this.bytesAcked = len; this.bytesOnDisk = len; - this.writer = writer; + this.writer.set(writer); this.bytesReserved = bytesToReserve; this.originalBytesReserved = bytesToReserve; } @@ -110,7 +111,7 @@ public class ReplicaInPipeline extends ReplicaInfo super(from); this.bytesAcked = from.getBytesAcked(); this.bytesOnDisk = from.getBytesOnDisk(); - this.writer = from.writer; + this.writer.set(from.writer.get()); this.bytesReserved = from.bytesReserved; this.originalBytesReserved = from.originalBytesReserved; } @@ -175,18 +176,11 @@ public class ReplicaInPipeline extends ReplicaInfo return new ChunkChecksum(getBytesOnDisk(), lastChecksum); } - /** - * Set the thread that is writing to this replica - * @param writer a thread writing to this replica - */ - public void setWriter(Thread writer) { - this.writer = writer; - } - public void interruptThread() { - if (writer != null && writer != Thread.currentThread() - && writer.isAlive()) { - this.writer.interrupt(); + Thread thread = writer.get(); + if (thread != null && thread != Thread.currentThread() + && thread.isAlive()) { + thread.interrupt(); } } @@ -196,17 +190,35 @@ public class ReplicaInPipeline extends ReplicaInfo } /** + * Attempt to set the writer to a new value. + */ + public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) { + return writer.compareAndSet(prevWriter, newWriter); + } + + /** * Interrupt the writing thread and wait until it dies * @throws IOException the waiting is interrupted */ public void stopWriter(long xceiverStopTimeout) throws IOException { - if (writer != null && writer != Thread.currentThread() && writer.isAlive()) { - writer.interrupt(); + while (true) { + Thread thread = writer.get(); + if ((thread == null) || (thread == Thread.currentThread()) || + (!thread.isAlive())) { + if (writer.compareAndSet(thread, null) == true) { + return; // Done + } + // The writer changed. Go back to the start of the loop and attempt to + // stop the new writer. + continue; + } + thread.interrupt(); try { - writer.join(xceiverStopTimeout); - if (writer.isAlive()) { - final String msg = "Join on writer thread " + writer + " timed out"; - DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(writer)); + thread.join(xceiverStopTimeout); + if (thread.isAlive()) { + // Our thread join timed out. + final String msg = "Join on writer thread " + thread + " timed out"; + DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread)); throw new IOException(msg); } } catch (InterruptedException e) { @@ -214,7 +226,7 @@ public class ReplicaInPipeline extends ReplicaInfo } } } - + @Override // Object public int hashCode() { return super.hashCode(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/63fe2ae7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 2f16fc5..acc1353 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1210,8 +1210,20 @@ class FsDatasetImpl implements FsDatasetSpi { return newReplicaInfo; } + private static class MustStopExistingWriter extends Exception { + private final ReplicaInPipeline rip; + + MustStopExistingWriter(ReplicaInPipeline rip) { + this.rip = rip; + } + + ReplicaInPipeline getReplica() { + return rip; + } + } + private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, - long expectedBlockLen) throws IOException { + long expectedBlockLen) throws IOException, MustStopExistingWriter { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check state @@ -1235,9 +1247,9 @@ class FsDatasetImpl implements FsDatasetSpi { long replicaLen = replicaInfo.getNumBytes(); if (replicaInfo.getState() == ReplicaState.RBW) { ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; - // kill the previous writer - rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); - rbw.setWriter(Thread.currentThread()); + if (!rbw.attemptToSetWriter(null, Thread.currentThread())) { + throw new MustStopExistingWriter(rbw); + } // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same if (replicaLen != rbw.getBytesOnDisk() || replicaLen != rbw.getBytesAcked()) { @@ -1263,39 +1275,55 @@ class FsDatasetImpl implements FsDatasetSpi { ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed append to " + b); - ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); + while (true) { + try { + synchronized (this) { + ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); - FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); - ReplicaBeingWritten replica; - try { - // change the replica's state/gs etc. - if (replicaInfo.getState() == ReplicaState.FINALIZED) { - replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, - newGS, b.getNumBytes()); - } else { //RBW - bumpReplicaGS(replicaInfo, newGS); - replica = (ReplicaBeingWritten) replicaInfo; + FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); + ReplicaBeingWritten replica; + try { + // change the replica's state/gs etc. + if (replicaInfo.getState() == ReplicaState.FINALIZED) { + replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, + newGS, b.getNumBytes()); + } else { //RBW + bumpReplicaGS(replicaInfo, newGS); + replica = (ReplicaBeingWritten) replicaInfo; + } + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + return new ReplicaHandler(replica, ref); + } + } catch (MustStopExistingWriter e) { + e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout()); } - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; } - return new ReplicaHandler(replica, ref); } @Override // FsDatasetSpi public synchronized Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed close " + b); - // check replica's state - ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); - // bump the replica's GS - bumpReplicaGS(replicaInfo, newGS); - // finalize the replica if RBW - if (replicaInfo.getState() == ReplicaState.RBW) { - finalizeReplica(b.getBlockPoolId(), replicaInfo); + while (true) { + try { + synchronized (this) { + // check replica's state + ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); + // bump the replica's GS + bumpReplicaGS(replicaInfo, newGS); + // finalize the replica if RBW + if (replicaInfo.getState() == ReplicaState.RBW) { + finalizeReplica(b.getBlockPoolId(), replicaInfo); + } + return replicaInfo; + } + } catch (MustStopExistingWriter e) { + e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout()); + } } - return replicaInfo; } /** @@ -1387,26 +1415,37 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized ReplicaHandler recoverRbw( + public ReplicaHandler recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { LOG.info("Recover RBW replica " + b); - ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); - - // check the replica's state - if (replicaInfo.getState() != ReplicaState.RBW) { - throw new ReplicaNotFoundException( - ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); + while (true) { + try { + synchronized (this) { + ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); + + // check the replica's state + if (replicaInfo.getState() != ReplicaState.RBW) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); + } + ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; + if (!rbw.attemptToSetWriter(null, Thread.currentThread())) { + throw new MustStopExistingWriter(rbw); + } + LOG.info("Recovering " + rbw); + return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd); + } + } catch (MustStopExistingWriter e) { + e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout()); + } } - ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; - - LOG.info("Recovering " + rbw); - - // Stop the previous writer - rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); - rbw.setWriter(Thread.currentThread()); + } + private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, + ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) + throws IOException { // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1422,7 +1461,7 @@ class FsDatasetImpl implements FsDatasetSpi { long numBytes = rbw.getNumBytes(); if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ throw new ReplicaNotFoundException("Unmatched length replica " + - replicaInfo + ": BytesAcked = " + bytesAcked + + rbw + ": BytesAcked = " + bytesAcked + " BytesRcvd = " + numBytes + " are not in the range of [" + minBytesRcvd + ", " + maxBytesRcvd + "]."); } @@ -2354,8 +2393,8 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized ReplicaRecoveryInfo initReplicaRecovery( - RecoveringBlock rBlock) throws IOException { + public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) + throws IOException { return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(), datanode.getDnConf().getXceiverStopTimeout()); @@ -2364,6 +2403,20 @@ class FsDatasetImpl implements FsDatasetSpi { /** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, Block block, long recoveryId, long xceiverStopTimeout) throws IOException { + while (true) { + try { + synchronized (map.getMutex()) { + return initReplicaRecoveryImpl(bpid, map, block, recoveryId); + } + } catch (MustStopExistingWriter e) { + e.getReplica().stopWriter(xceiverStopTimeout); + } + } + } + + static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map, + Block block, long recoveryId) + throws IOException, MustStopExistingWriter { final ReplicaInfo replica = map.get(bpid, block.getBlockId()); LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId + ", replica=" + replica); @@ -2376,7 +2429,9 @@ class FsDatasetImpl implements FsDatasetSpi { //stop writer if there is any if (replica instanceof ReplicaInPipeline) { final ReplicaInPipeline rip = (ReplicaInPipeline)replica; - rip.stopWriter(xceiverStopTimeout); + if (!rip.attemptToSetWriter(null, Thread.currentThread())) { + throw new MustStopExistingWriter(rip); + } //check replica bytes on disk. if (rip.getBytesOnDisk() < rip.getVisibleLength()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/63fe2ae7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 6f0b8a7..0d1b787 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -220,7 +220,7 @@ class ReplicaMap { * Give access to mutex used for synchronizing ReplicasMap * @return object used as lock */ - Object getMutext() { + Object getMutex() { return mutex; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63fe2ae7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index b4f3058..41807e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -40,10 +40,12 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Iterators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -86,7 +88,9 @@ import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -116,12 +120,18 @@ public class TestBlockRecovery { private final static long REPLICA_LEN2 = 5000L; private final static ExtendedBlock block = new ExtendedBlock(POOL_ID, BLOCK_ID, BLOCK_LEN, GEN_STAMP); - + + @Rule + public TestName currentTestName = new TestName(); + static { GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.ALL); GenericTestUtils.setLogLevel(LOG, Level.ALL); } + private final long + TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L; + /** * Starts an instance of DataNode * @throws IOException @@ -133,6 +143,12 @@ public class TestBlockRecovery { conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); + if (currentTestName.getMethodName().equals( + "testInitReplicaRecoveryDoesNotHogLock")) { + // This test requires a very long value for the xceiver stop timeout. + conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, + TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS); + } conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); FileSystem.setDefaultUri(conf, "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); @@ -265,7 +281,7 @@ public class TestBlockRecovery { * Two replicas are in Finalized state * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testFinalizedReplicas () throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -304,7 +320,7 @@ public class TestBlockRecovery { * One replica is Finalized and another is RBW. * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testFinalizedRbwReplicas() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -345,7 +361,7 @@ public class TestBlockRecovery { * One replica is Finalized and another is RWR. * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testFinalizedRwrReplicas() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -387,7 +403,7 @@ public class TestBlockRecovery { * Two replicas are RBW. * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testRBWReplicas() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -411,7 +427,7 @@ public class TestBlockRecovery { * One replica is RBW and another is RWR. * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testRBW_RWRReplicas() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -436,7 +452,7 @@ public class TestBlockRecovery { * Two replicas are RWR. * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testRWRReplicas() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -472,7 +488,7 @@ public class TestBlockRecovery { * @throws IOException * in case of an error */ - @Test + @Test(timeout=60000) public void testRecoveryInProgressException() throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { @@ -497,7 +513,7 @@ public class TestBlockRecovery { * @throws IOException * in case of an error */ - @Test + @Test(timeout=60000) public void testErrorReplicas() throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -524,7 +540,7 @@ public class TestBlockRecovery { * * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testZeroLenReplicas() throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -564,7 +580,7 @@ public class TestBlockRecovery { * * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testFailedReplicaUpdate() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -586,7 +602,7 @@ public class TestBlockRecovery { * * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testNoReplicaUnderRecovery() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -611,7 +627,7 @@ public class TestBlockRecovery { * * @throws IOException in case of an error */ - @Test + @Test(timeout=60000) public void testNotMatchedReplicaID() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -712,7 +728,7 @@ public class TestBlockRecovery { * throw an exception. * @throws Exception */ - @Test + @Test(timeout=60000) public void testRURReplicas() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); @@ -742,4 +758,97 @@ public class TestBlockRecovery { assertTrue(exceptionThrown); } } + + /** + * Test that initReplicaRecovery does not hold the lock for an unreasonable + * amount of time if a writer is taking a long time to stop. + */ + @Test(timeout=60000) + public void testInitReplicaRecoveryDoesNotHogLock() throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("Running " + GenericTestUtils.getMethodName()); + } + // We need a long value for the data xceiver stop timeout. + // Otherwise the timeout will trigger, and we will not have tested that + // thread join was done locklessly. + Assert.assertEquals( + TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS, + dn.getDnConf().getXceiverStopTimeout()); + final Semaphore progressParent = new Semaphore(0); + final Semaphore terminateSlowWorker = new Semaphore(0); + final AtomicBoolean failure = new AtomicBoolean(false); + Collection recoveringBlocks = + initRecoveringBlocks(); + final RecoveringBlock recoveringBlock = + Iterators.get(recoveringBlocks.iterator(), 0); + final ExtendedBlock block = recoveringBlock.getBlock(); + Thread slowWorker = new Thread(new Runnable() { + @Override + public void run() { + try { + // Register this thread as the writer for the recoveringBlock. + LOG.debug("slowWorker creating rbw"); + ReplicaHandler replicaHandler = + spyDN.data.createRbw(StorageType.DISK, block, false); + replicaHandler.close(); + LOG.debug("slowWorker created rbw"); + // Tell the parent thread to start progressing. + progressParent.release(); + while (true) { + try { + terminateSlowWorker.acquire(); + break; + } catch (InterruptedException e) { + // Ignore interrupted exceptions so that the waitingWorker thread + // will have to wait for us. + } + } + LOG.debug("slowWorker exiting"); + } catch (Throwable t) { + LOG.error("slowWorker got exception", t); + failure.set(true); + } + } + }); + // Start the slow worker thread and wait for it to take ownership of the + // ReplicaInPipeline + slowWorker.start(); + while (true) { + try { + progressParent.acquire(); + break; + } catch (InterruptedException e) { + // Ignore interrupted exceptions + } + } + + // Start a worker thread which will wait for the slow worker thread. + Thread waitingWorker = new Thread(new Runnable() { + @Override + public void run() { + try { + // Attempt to terminate the other worker thread and take ownership + // of the ReplicaInPipeline. + LOG.debug("waitingWorker initiating recovery"); + spyDN.initReplicaRecovery(recoveringBlock); + LOG.debug("waitingWorker initiated recovery"); + } catch (Throwable t) { + GenericTestUtils.assertExceptionContains("meta does not exist", t); + } + } + }); + waitingWorker.start(); + + // Do an operation that requires the lock. This should not be blocked + // by the replica recovery in progress. + spyDN.getFSDataset().getReplicaString( + recoveringBlock.getBlock().getBlockPoolId(), + recoveringBlock.getBlock().getBlockId()); + + // Wait for the two worker threads to exit normally. + terminateSlowWorker.release(); + slowWorker.join(); + waitingWorker.join(); + Assert.assertFalse("The slowWriter thread failed.", failure.get()); + } }