Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 289BA187C0 for ; Thu, 8 Oct 2015 21:17:32 +0000 (UTC) Received: (qmail 69696 invoked by uid 500); 8 Oct 2015 21:17:16 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 69491 invoked by uid 500); 8 Oct 2015 21:17:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 67448 invoked by uid 99); 8 Oct 2015 21:17:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Oct 2015 21:17:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 584D5E0BE2; Thu, 8 Oct 2015 21:17:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jing9@apache.org To: common-commits@hadoop.apache.org Date: Thu, 08 Oct 2015 21:17:46 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [35/35] hadoop git commit: HDFS-8967. Create a BlockManagerLock class to represent the lock used in the BlockManager. Contributed by Haohui Mai. HDFS-8967. Create a BlockManagerLock class to represent the lock used in the BlockManager. Contributed by Haohui Mai. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b645d67a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b645d67a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b645d67a Branch: refs/heads/HDFS-8966 Commit: b645d67a91087c4f71df2c17498bd9923c7a40f0 Parents: 118a35b Author: Jing Zhao Authored: Tue Oct 6 23:35:24 2015 -0700 Committer: Jing Zhao Committed: Thu Oct 8 14:15:14 2015 -0700 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 124 ++++++++++++------- .../blockmanagement/BlockManagerLock.java | 50 ++++++++ .../CacheReplicationMonitor.java | 9 +- .../server/blockmanagement/DatanodeManager.java | 10 +- .../blockmanagement/DecommissionManager.java | 4 +- .../blockmanagement/HeartbeatManager.java | 8 +- .../hdfs/server/namenode/CacheManager.java | 4 +- .../hdfs/server/namenode/FSNamesystem.java | 5 + .../hadoop/hdfs/server/namenode/Namesystem.java | 4 + .../blockmanagement/BlockManagerTestUtil.java | 15 ++- .../blockmanagement/TestBlockManager.java | 25 ++-- .../blockmanagement/TestDatanodeManager.java | 2 + .../blockmanagement/TestReplicationPolicy.java | 22 +++- .../datanode/TestDataNodeVolumeFailure.java | 3 +- 14 files changed, 199 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 18bfc41..35e3c1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; +import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; @@ -112,7 +113,7 @@ import org.slf4j.LoggerFactory; * Keeps information related to the blocks stored in the Hadoop cluster. */ @InterfaceAudience.Private -public class BlockManager implements BlockStatsMXBean { +public class BlockManager implements RwLock, BlockStatsMXBean { public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class); public static final Logger blockLog = NameNode.blockStateChangeLog; @@ -125,6 +126,7 @@ public class BlockManager implements BlockStatsMXBean { private final Namesystem namesystem; + private final BlockManagerLock lock; private final DatanodeManager datanodeManager; private final HeartbeatManager heartbeatManager; private final BlockTokenSecretManager blockTokenSecretManager; @@ -299,6 +301,7 @@ public class BlockManager implements BlockStatsMXBean { public BlockManager(final Namesystem namesystem, final Configuration conf) throws IOException { this.namesystem = namesystem; + this.lock = new BlockManagerLock(namesystem); datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); @@ -518,7 +521,7 @@ public class BlockManager implements BlockStatsMXBean { /** Dump meta data to out. */ public void metaSave(PrintWriter out) { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); final List live = new ArrayList(); final List dead = new ArrayList(); datanodeManager.fetchDatanodes(live, dead, false); @@ -550,7 +553,7 @@ public class BlockManager implements BlockStatsMXBean { // Dump all datanodes getDatanodeManager().datanodeDump(out); } - + /** * Dump the metadata for the given block in a human-readable * form. @@ -579,12 +582,12 @@ public class BlockManager implements BlockStatsMXBean { out.print(fileName + ": "); } // l: == live:, d: == decommissioned c: == corrupt e: == excess - out.print(block + ((usableReplicas > 0)? "" : " MISSING") + + out.print(block + ((usableReplicas > 0)? "" : " MISSING") + " (replicas:" + " l: " + numReplicas.liveReplicas() + " d: " + numReplicas.decommissionedAndDecommissioning() + " c: " + numReplicas.corruptReplicas() + - " e: " + numReplicas.excessReplicas() + ") "); + " e: " + numReplicas.excessReplicas() + ") "); Collection corruptNodes = corruptReplicas.getNodes(block); @@ -929,7 +932,7 @@ public class BlockManager implements BlockStatsMXBean { final boolean inSnapshot, FileEncryptionInfo feInfo, ErasureCodingPolicy ecPolicy) throws IOException { - assert namesystem.hasReadLock(); + assert hasReadLock(); if (blocks == null) { return null; } else if (blocks.length == 0) { @@ -963,6 +966,41 @@ public class BlockManager implements BlockStatsMXBean { } } + @Override + public void readLock() { + lock.readLock().lock(); + } + + @Override + public void readUnlock() { + lock.readLock().unlock(); + } + + @Override + public boolean hasReadLock() { + return lock.hasReadLock(); + } + + @Override + public boolean hasWriteLock() { + return lock.hasWriteLock(); + } + + @Override + public void writeLock() { + lock.writeLock().lock(); + } + + @Override + public void writeLockInterruptibly() throws InterruptedException { + lock.writeLock().lockInterruptibly(); + } + + @Override + public void writeUnlock() { + lock.writeLock().unlock(); + } + /** @return current access keys. */ public ExportedBlockKeys getBlockKeys() { return isBlockTokenEnabled()? blockTokenSecretManager.exportKeys() @@ -1078,12 +1116,12 @@ public class BlockManager implements BlockStatsMXBean { public BlocksWithLocations getBlocks(DatanodeID datanode, long size ) throws IOException { namesystem.checkOperation(OperationCategory.READ); - namesystem.readLock(); + readLock(); try { namesystem.checkOperation(OperationCategory.READ); return getBlocksWithLocations(datanode, size); } finally { - namesystem.readUnlock(); + readUnlock(); } } @@ -1146,7 +1184,7 @@ public class BlockManager implements BlockStatsMXBean { /** Remove the blocks associated to the given DatanodeStorageInfo. */ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); final Iterator it = storageInfo.getBlockIterator(); DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { @@ -1223,7 +1261,7 @@ public class BlockManager implements BlockStatsMXBean { */ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, final DatanodeInfo dn, String storageID, String reason) throws IOException { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); final Block reportedBlock = blk.getLocalBlock(); final BlockInfo storedBlock = getStoredBlock(reportedBlock); if (storedBlock == null) { @@ -1401,13 +1439,13 @@ public class BlockManager implements BlockStatsMXBean { */ int computeBlockRecoveryWork(int blocksToProcess) { List> blocksToReplicate = null; - namesystem.writeLock(); + writeLock(); try { // Choose the blocks to be replicated blocksToReplicate = neededReplications .chooseUnderReplicatedBlocks(blocksToProcess); } finally { - namesystem.writeUnlock(); + writeUnlock(); } return computeRecoveryWorkForBlocks(blocksToReplicate); } @@ -1430,7 +1468,7 @@ public class BlockManager implements BlockStatsMXBean { List recovWork = new LinkedList<>(); // Step 1: categorize at-risk blocks into replication and EC tasks - namesystem.writeLock(); + writeLock(); try { synchronized (neededReplications) { for (int priority = 0; priority < blocksToRecover.size(); priority++) { @@ -1443,7 +1481,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock(); + writeUnlock(); } // Step 2: choose target nodes for each recovery task @@ -1465,7 +1503,7 @@ public class BlockManager implements BlockStatsMXBean { } // Step 3: add tasks to the DN - namesystem.writeLock(); + writeLock(); try { for(BlockRecoveryWork rw : recovWork){ final DatanodeStorageInfo[] targets = rw.getTargets(); @@ -1481,7 +1519,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock(); + writeUnlock(); } if (blockLog.isInfoEnabled()) { @@ -1851,7 +1889,7 @@ public class BlockManager implements BlockStatsMXBean { private void processPendingReplications() { BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks(); if (timedOutItems != null) { - namesystem.writeLock(); + writeLock(); try { for (int i = 0; i < timedOutItems.length; i++) { /* @@ -1869,7 +1907,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock(); + writeUnlock(); } /* If we know the target datanodes where the replication timedout, * we could invoke decBlocksScheduled() on it. Its ok for now. @@ -1878,7 +1916,7 @@ public class BlockManager implements BlockStatsMXBean { } public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) { - assert namesystem.hasReadLock(); + assert hasReadLock(); DatanodeDescriptor node = null; try { node = datanodeManager.getDatanode(nodeReg); @@ -1939,7 +1977,7 @@ public class BlockManager implements BlockStatsMXBean { final DatanodeStorage storage, final BlockListAsLongs newReport, BlockReportContext context, boolean lastStorageInRpc) throws IOException { - namesystem.writeLock(); + writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; DatanodeDescriptor node; @@ -2015,7 +2053,7 @@ public class BlockManager implements BlockStatsMXBean { } } finally { endTime = Time.monotonicNow(); - namesystem.writeUnlock(); + writeUnlock(); } if (invalidatedBlocks != null) { @@ -2042,7 +2080,7 @@ public class BlockManager implements BlockStatsMXBean { LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + "longer exists on the DataNode.", Long.toHexString(context.getReportId()), zombie.getStorageID()); - assert(namesystem.hasWriteLock()); + assert hasWriteLock(); Iterator iter = zombie.getBlockIterator(); int prevBlocks = zombie.numBlocks(); while (iter.hasNext()) { @@ -2076,7 +2114,7 @@ public class BlockManager implements BlockStatsMXBean { long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow(); long startPostponedMisReplicatedBlocksCount = getPostponedMisreplicatedBlocksCount(); - namesystem.writeLock(); + writeLock(); try { // blocksPerRescan is the configured number of blocks per rescan. // Randomly select blocksPerRescan consecutive blocks from the HashSet @@ -2129,7 +2167,7 @@ public class BlockManager implements BlockStatsMXBean { } } } finally { - namesystem.writeUnlock(); + writeUnlock(); long endPostponedMisReplicatedBlocksCount = getPostponedMisreplicatedBlocksCount(); LOG.info("Rescan of postponedMisreplicatedBlocks completed in " + @@ -2191,7 +2229,7 @@ public class BlockManager implements BlockStatsMXBean { BlockInfo block, long oldGenerationStamp, long oldNumBytes, DatanodeStorageInfo[] newStorages) throws IOException { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); BlockToMarkCorrupt b = null; if (block.getGenerationStamp() != oldGenerationStamp) { b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp, @@ -2239,7 +2277,7 @@ public class BlockManager implements BlockStatsMXBean { final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; - assert (namesystem.hasWriteLock()); + assert (hasWriteLock()); assert (storageInfo.getBlockReportCount() == 0); for (BlockReportReplica iblk : report) { @@ -2667,7 +2705,7 @@ public class BlockManager implements BlockStatsMXBean { private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, DatanodeStorageInfo storageInfo) throws IOException { - assert (storedBlock != null && namesystem.hasWriteLock()); + assert (storedBlock != null && hasWriteLock()); if (!namesystem.isInStartupSafeMode() || isPopulatingReplQueues()) { addStoredBlock(storedBlock, reported, storageInfo, null, false); @@ -2702,7 +2740,7 @@ public class BlockManager implements BlockStatsMXBean { DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { - assert block != null && namesystem.hasWriteLock(); + assert block != null && hasWriteLock(); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (!block.isComplete()) { @@ -2863,7 +2901,7 @@ public class BlockManager implements BlockStatsMXBean { * over or under replicated. Place it into the respective queue. */ public void processMisReplicatedBlocks() { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); stopReplicationInitializer(); neededReplications.clear(); replicationQueuesInitializer = new Daemon() { @@ -2920,7 +2958,7 @@ public class BlockManager implements BlockStatsMXBean { while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) { int processed = 0; - namesystem.writeLockInterruptibly(); + writeLockInterruptibly(); try { while (processed < numBlocksPerIteration && blocksItr.hasNext()) { BlockInfo block = blocksItr.next(); @@ -2974,7 +3012,7 @@ public class BlockManager implements BlockStatsMXBean { break; } } finally { - namesystem.writeUnlock(); + writeUnlock(); // Make sure it is out of the write lock for sufficiently long time. Thread.sleep(sleepDuration); } @@ -3072,7 +3110,7 @@ public class BlockManager implements BlockStatsMXBean { private void processOverReplicatedBlock(final BlockInfo block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); if (addedNode == delNodeHint) { delNodeHint = null; } @@ -3110,7 +3148,7 @@ public class BlockManager implements BlockStatsMXBean { BlockInfo storedBlock, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); // first form a rack to datanodes map and BlockCollection bc = getBlockCollection(storedBlock); if (storedBlock.isStriped()) { @@ -3296,7 +3334,7 @@ public class BlockManager implements BlockStatsMXBean { } private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); LightWeightLinkedSet excessBlocks = excessReplicateMap.get( dn.getDatanodeUuid()); if (excessBlocks == null) { @@ -3327,7 +3365,7 @@ public class BlockManager implements BlockStatsMXBean { */ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node); - assert (namesystem.hasWriteLock()); + assert hasWriteLock(); { if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + @@ -3504,7 +3542,7 @@ public class BlockManager implements BlockStatsMXBean { */ public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); int received = 0; int deleted = 0; int receiving = 0; @@ -3704,7 +3742,7 @@ public class BlockManager implements BlockStatsMXBean { } public void removeBlock(BlockInfo block) { - assert namesystem.hasWriteLock(); + assert hasWriteLock(); // No need to ACK blocks that are being removed entirely // from the namespace, since the removal of the associated // file already removes them from the block map below. @@ -3738,7 +3776,7 @@ public class BlockManager implements BlockStatsMXBean { /** updates a block in under replication queue */ private void updateNeededReplications(final BlockInfo block, final int curReplicasDelta, int expectedReplicasDelta) { - namesystem.writeLock(); + writeLock(); try { if (!isPopulatingReplQueues()) { return; @@ -3756,7 +3794,7 @@ public class BlockManager implements BlockStatsMXBean { repl.decommissionedAndDecommissioning(), oldExpectedReplicas); } } finally { - namesystem.writeUnlock(); + writeUnlock(); } } @@ -3816,7 +3854,7 @@ public class BlockManager implements BlockStatsMXBean { private int invalidateWorkForOneNode(DatanodeInfo dn) { final List toInvalidate; - namesystem.writeLock(); + writeLock(); try { // blocks should not be replicated or removed if safe mode is on if (namesystem.isInSafeMode()) { @@ -3840,7 +3878,7 @@ public class BlockManager implements BlockStatsMXBean { return 0; } } finally { - namesystem.writeUnlock(); + writeUnlock(); } blockLog.debug("BLOCK* {}: ask {} to delete {}", getClass().getSimpleName(), dn, toInvalidate); @@ -4093,12 +4131,12 @@ public class BlockManager implements BlockStatsMXBean { int workFound = this.computeBlockRecoveryWork(blocksToProcess); // Update counters - namesystem.writeLock(); + writeLock(); try { this.updateState(); this.scheduledReplicationBlocksCount = workFound; } finally { - namesystem.writeUnlock(); + writeUnlock(); } workFound += this.computeInvalidateWork(nodesToProcess); return workFound; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java new file mode 100644 index 0000000..18dc201 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.server.namenode.Namesystem; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +class BlockManagerLock implements ReadWriteLock { + private final ReentrantReadWriteLock coarseLock; + + BlockManagerLock(Namesystem ns) { + this.coarseLock = ns.getLockImplementation(); + } + + @Override + public Lock readLock() { + return coarseLock.readLock(); + } + + @Override + public Lock writeLock() { + return coarseLock.writeLock(); + } + + boolean hasReadLock() { + return hasWriteLock() || coarseLock.getReadHoldCount() > 0; + } + + boolean hasWriteLock() { + return coarseLock.isWriteLockedByCurrentThread(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index 2f81ddf..54bcffd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -55,7 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -; /** * Scans the namesystem, scheduling blocks to be cached as appropriate. @@ -218,7 +217,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { * after are not atomic. */ public void waitForRescanIfNeeded() { - Preconditions.checkArgument(!namesystem.hasWriteLock(), + Preconditions.checkArgument(!blockManager.hasWriteLock(), "Must not hold the FSN write lock when waiting for a rescan."); Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan."); @@ -263,7 +262,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { */ @Override public void close() throws IOException { - Preconditions.checkArgument(namesystem.hasWriteLock()); + Preconditions.checkArgument(blockManager.hasWriteLock()); lock.lock(); try { if (shutdown) return; @@ -285,7 +284,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { scannedDirectives = 0; scannedBlocks = 0; try { - namesystem.writeLock(); + blockManager.writeLock(); try { lock.lock(); if (shutdown) { @@ -302,7 +301,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { rescanCachedBlockMap(); blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime(); } finally { - namesystem.writeUnlock(); + blockManager.writeUnlock(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 28ab716..97e4a94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -540,7 +540,7 @@ public class DatanodeManager { * @param nodeInfo datanode descriptor. */ private void removeDatanode(DatanodeDescriptor nodeInfo) { - assert namesystem.hasWriteLock(); + assert blockManager.hasWriteLock(); heartbeatManager.removeDatanode(nodeInfo); blockManager.removeBlocksAssociatedTo(nodeInfo); networktopology.remove(nodeInfo); @@ -559,7 +559,7 @@ public class DatanodeManager { */ public void removeDatanode(final DatanodeID node ) throws UnregisteredNodeException { - namesystem.writeLock(); + blockManager.writeLock(); try { final DatanodeDescriptor descriptor = getDatanode(node); if (descriptor != null) { @@ -569,7 +569,7 @@ public class DatanodeManager { + node + " does not exist"); } } finally { - namesystem.writeUnlock(); + blockManager.writeUnlock(); } } @@ -993,12 +993,12 @@ public class DatanodeManager { */ public void refreshNodes(final Configuration conf) throws IOException { refreshHostsReader(conf); - namesystem.writeLock(); + blockManager.writeLock(); try { refreshDatanodes(); countSoftwareVersions(); } finally { - namesystem.writeUnlock(); + blockManager.writeUnlock(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index fb86ff3..1172ef3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -366,12 +366,12 @@ public class DecommissionManager { numBlocksChecked = 0; numNodesChecked = 0; // Check decom progress - namesystem.writeLock(); + blockManager.writeLock(); try { processPendingNodes(); check(); } finally { - namesystem.writeUnlock(); + blockManager.writeUnlock(); } if (numBlocksChecked + numNodesChecked > 0) { LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index f2e9827..95adbe4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -356,7 +356,7 @@ class HeartbeatManager implements DatanodeStatistics { allAlive = dead == null && failedStorage == null; if (dead != null) { // acquire the fsnamesystem lock, and then remove the dead node. - namesystem.writeLock(); + blockManager.writeLock(); try { if (namesystem.isInStartupSafeMode()) { return; @@ -365,12 +365,12 @@ class HeartbeatManager implements DatanodeStatistics { dm.removeDeadDatanode(dead); } } finally { - namesystem.writeUnlock(); + blockManager.writeUnlock(); } } if (failedStorage != null) { // acquire the fsnamesystem lock, and remove blocks on the storage. - namesystem.writeLock(); + blockManager.writeLock(); try { if (namesystem.isInStartupSafeMode()) { return; @@ -379,7 +379,7 @@ class HeartbeatManager implements DatanodeStatistics { blockManager.removeBlocksAssociatedTo(failedStorage); } } finally { - namesystem.writeUnlock(); + blockManager.writeUnlock(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 3559065..11797d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -928,7 +928,7 @@ public final class CacheManager { public final void processCacheReport(final DatanodeID datanodeID, final List blockIds) throws IOException { - namesystem.writeLock(); + blockManager.writeLock(); final long startTime = Time.monotonicNow(); final long endTime; try { @@ -942,7 +942,7 @@ public final class CacheManager { processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); - namesystem.writeUnlock(); + blockManager.writeUnlock(); } // Log the block report processing stats from Namenode perspective http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index a5b6c77..1ee15a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -6216,6 +6216,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return haContext; } + @Override + public ReentrantReadWriteLock getLockImplementation() { + return fsLock.coarseLock; + } + @Override // NameNodeMXBean public String getCorruptFiles() { List list = new ArrayList(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index b1012c2..89fe678 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; +import java.util.concurrent.locks.ReentrantReadWriteLock; + /** Namesystem operations. */ @InterfaceAudience.Private public interface Namesystem extends RwLock, SafeMode { @@ -67,4 +69,6 @@ public interface Namesystem extends RwLock, SafeMode { CacheManager getCacheManager(); HAContext getHAContext(); + + ReentrantReadWriteLock getLockImplementation(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 64d80bd..1ae6c2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -172,10 +172,10 @@ public class BlockManagerTestUtil { * @param dnName the name of the DataNode */ public static void noticeDeadDatanode(NameNode nn, String dnName) { - FSNamesystem namesystem = nn.getNamesystem(); - namesystem.writeLock(); + final BlockManager bm = nn.getNamesystem().getBlockManager(); + bm.writeLock(); try { - DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager(); + DatanodeManager dnm = bm.getDatanodeManager(); HeartbeatManager hbm = dnm.getHeartbeatManager(); DatanodeDescriptor[] dnds = hbm.getDatanodes(); DatanodeDescriptor theDND = null; @@ -191,7 +191,7 @@ public class BlockManagerTestUtil { hbm.heartbeatCheck(); } } finally { - namesystem.writeUnlock(); + bm.writeUnlock(); } } @@ -220,18 +220,17 @@ public class BlockManagerTestUtil { * Call heartbeat check function of HeartbeatManager and get * under replicated blocks count within write lock to make sure * computeDatanodeWork doesn't interfere. - * @param namesystem the FSNamesystem * @param bm the BlockManager to manipulate * @return the number of under replicated blocks */ public static int checkHeartbeatAndGetUnderReplicatedBlocksCount( - FSNamesystem namesystem, BlockManager bm) { - namesystem.writeLock(); + BlockManager bm) { + bm.writeLock(); try { bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); return bm.getUnderReplicatedNotMissingBlocks(); } finally { - namesystem.writeUnlock(); + bm.writeUnlock(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 094794b..c05c61d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; @@ -34,6 +35,7 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.locks.Lock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -63,12 +65,12 @@ import org.junit.Assert; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Lists; +import org.mockito.internal.util.reflection.Whitebox; public class TestBlockManager { private DatanodeStorageInfo[] storages; @@ -97,10 +99,15 @@ public class TestBlockManager { Configuration conf = new HdfsConfiguration(); conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "need to set a dummy value here so it assumes a multi-rack cluster"); - fsn = Mockito.mock(FSNamesystem.class); - Mockito.doReturn(true).when(fsn).hasWriteLock(); - Mockito.doReturn(true).when(fsn).hasReadLock(); + fsn = mock(FSNamesystem.class); + Lock lockImpl = mock(Lock.class); + BlockManagerLock lock = mock(BlockManagerLock.class); bm = new BlockManager(fsn, conf); + Whitebox.setInternalState(bm, "lock", lock); + doReturn(true).when(lock).hasWriteLock(); + doReturn(true).when(lock).hasReadLock(); + doReturn(lockImpl).when(lock).readLock(); + doReturn(lockImpl).when(lock).writeLock(); final String[] racks = { "/rackA", "/rackA", @@ -438,9 +445,9 @@ public class TestBlockManager { private BlockInfo addBlockOnNodes(long blockId, List nodes) { long inodeId = ++mockINodeId; - BlockCollection bc = Mockito.mock(BlockCollection.class); - Mockito.doReturn(inodeId).when(bc).getId(); - Mockito.doReturn(bc).when(fsn).getBlockCollection(inodeId); + BlockCollection bc = mock(BlockCollection.class); + doReturn(inodeId).when(bc).getId(); + doReturn(bc).when(fsn).getBlockCollection(inodeId); BlockInfo blockInfo = blockOnNodes(blockId, nodes); blockInfo.setReplication((short) 3); @@ -749,7 +756,7 @@ public class TestBlockManager { Block block = new Block(blkId); BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3); - BlockCollection bc = Mockito.mock(BlockCollection.class); + BlockCollection bc = mock(BlockCollection.class); long inodeId = ++mockINodeId; doReturn(inodeId).when(bc).getId(); bm.blocksMap.addBlockCollection(blockInfo, bc); @@ -761,7 +768,7 @@ public class TestBlockManager { Block block = new Block(blkId); BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3); blockInfo.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, null); - BlockCollection bc = Mockito.mock(BlockCollection.class); + BlockCollection bc = mock(BlockCollection.class); long inodeId = ++mockINodeId; doReturn(inodeId).when(bc).getId(); bm.blocksMap.addBlockCollection(blockInfo, bc); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index b55a716..8bad600 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -55,6 +55,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.util.Shell; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; import static org.hamcrest.core.Is.is; @@ -70,6 +71,7 @@ public class TestDatanodeManager { private static DatanodeManager mockDatanodeManager( FSNamesystem fsn, Configuration conf) throws IOException { BlockManager bm = Mockito.mock(BlockManager.class); + Mockito.doReturn(true).when(bm).hasWriteLock(); BlockReportLeaseManager blm = new BlockReportLeaseManager(conf); Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm); DatanodeManager dm = new DatanodeManager(bm, fsn, conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index b8a7e77..ba8e966 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.Lock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -64,6 +65,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.internal.util.reflection.Whitebox; @RunWith(Parameterized.class) public class TestReplicationPolicy extends BaseReplicationPolicyTest { @@ -1137,9 +1139,11 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { public void testAddStoredBlockDoesNotCauseSkippedReplication() throws IOException { Namesystem mockNS = mock(Namesystem.class); - when(mockNS.hasWriteLock()).thenReturn(true); - when(mockNS.hasReadLock()).thenReturn(true); BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); + BlockManagerLock lock = mock(BlockManagerLock.class); + when(lock.hasWriteLock()).thenReturn(true); + when(lock.hasReadLock()).thenReturn(true); + Whitebox.setInternalState(bm, "lock", lock); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); @@ -1186,9 +1190,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication() throws IOException { Namesystem mockNS = mock(Namesystem.class); - when(mockNS.hasReadLock()).thenReturn(true); - BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); + BlockManagerLock lock = mock(BlockManagerLock.class); + Lock impl = mock(Lock.class); + when(lock.hasReadLock()).thenReturn(true); + when(lock.writeLock()).thenReturn(impl); + Whitebox.setInternalState(bm, "lock", lock); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; long blkID1 = ThreadLocalRandom.current().nextLong(); @@ -1258,9 +1265,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { public void testupdateNeededReplicationsDoesNotCauseSkippedReplication() throws IOException { Namesystem mockNS = mock(Namesystem.class); - when(mockNS.hasReadLock()).thenReturn(true); - BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); + BlockManagerLock lock = mock(BlockManagerLock.class); + Lock impl = mock(Lock.class); + when(lock.hasReadLock()).thenReturn(true); + when(lock.writeLock()).thenReturn(impl); + Whitebox.setInternalState(bm, "lock", lock); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b645d67a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 2c4fcc5..98519c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -393,8 +393,7 @@ public class TestDataNodeVolumeFailure { // underReplicatedBlocks are due to failed volumes int underReplicatedBlocks = - BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount( - cluster.getNamesystem(), bm); + BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(bm); assertTrue("There is no under replicated block after volume failure", underReplicatedBlocks > 0); }