Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E89C41060E for ; Wed, 21 Jan 2015 20:42:20 +0000 (UTC) Received: (qmail 1810 invoked by uid 500); 21 Jan 2015 20:42:18 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 1718 invoked by uid 500); 21 Jan 2015 20:42:18 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 1678 invoked by uid 99); 21 Jan 2015 20:42:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Jan 2015 20:42:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 648B4E03AB; Wed, 21 Jan 2015 20:42:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kihwal@apache.org To: common-commits@hadoop.apache.org Message-Id: <239675ade90f430781a3f77977edcedc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-7548. Corrupt block reporting delayed until datablock scanner thread detects it. Contributed by Rushabh Shah. Date: Wed, 21 Jan 2015 20:42:18 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 925c9fed3 -> c0af72c7f HDFS-7548. Corrupt block reporting delayed until datablock scanner thread detects it. Contributed by Rushabh Shah. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0af72c7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0af72c7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0af72c7 Branch: refs/heads/trunk Commit: c0af72c7f74b6925786e24543cac433b906dd6d3 Parents: 925c9fe Author: Kihwal Lee Authored: Wed Jan 21 14:41:31 2015 -0600 Committer: Kihwal Lee Committed: Wed Jan 21 14:41:31 2015 -0600 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/datanode/BlockPoolSliceScanner.java | 49 ++++++++++++++-- .../hdfs/server/datanode/BlockSender.java | 3 + .../hdfs/server/datanode/DataBlockScanner.java | 15 ++++- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 4 +- .../hadoop/hdfs/TestDatanodeBlockScanner.java | 60 +++++++++++++++++++- 7 files changed, 125 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0a301f8..25ad33b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -756,6 +756,9 @@ Release 2.7.0 - UNRELEASED HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via Colin P. McCabe) + HDFS-7548. Corrupt block reporting delayed until datablock scanner thread + detects it (Rushabh Shah via kihwal) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index 61f1e7e..f36fea1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -105,6 +105,7 @@ class BlockPoolSliceScanner { private long bytesLeft = 0; // Bytes to scan in this period private long totalBytesToScan = 0; private boolean isNewPeriod = true; + private int lastScanTimeDifference = 5*60*1000; private final LogFileHandler verificationLog; @@ -112,6 +113,7 @@ class BlockPoolSliceScanner { 200, MAX_SCAN_RATE); private static enum ScanType { + IMMEDIATE_SCAN, VERIFICATION_SCAN, // scanned as part of periodic verfication NONE, } @@ -129,12 +131,17 @@ class BlockPoolSliceScanner { @Override public int compare(BlockScanInfo left, BlockScanInfo right) { + final ScanType leftNextScanType = left.nextScanType; + final ScanType rightNextScanType = right.nextScanType; final long l = left.lastScanTime; final long r = right.lastScanTime; + // Compare by nextScanType if they are same then compare by + // lastScanTimes // compare blocks itself if scantimes are same to avoid. // because TreeMap uses comparator if available to check existence of // the object. - return l < r? -1: l > r? 1: left.compareTo(right); + int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType); + return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1: l < r? -1: l > r? 1: left.compareTo(right); } }; @@ -142,6 +149,7 @@ class BlockPoolSliceScanner { ScanType lastScanType = ScanType.NONE; boolean lastScanOk = true; private LinkedElement next; + ScanType nextScanType = ScanType.VERIFICATION_SCAN; BlockScanInfo(Block block) { super(block); @@ -265,10 +273,12 @@ class BlockPoolSliceScanner { private synchronized void updateBlockInfo(LogEntry e) { BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp)); - if(info != null && e.verificationTime > 0 && + if (info != null && e.verificationTime > 0 && info.lastScanTime < e.verificationTime) { delBlockInfo(info); - info.lastScanTime = e.verificationTime; + if (info.nextScanType != ScanType.IMMEDIATE_SCAN) { + info.lastScanTime = e.verificationTime; + } info.lastScanType = ScanType.VERIFICATION_SCAN; addBlockInfo(info, false); } @@ -285,9 +295,23 @@ class BlockPoolSliceScanner { DFSUtil.getRandom().nextInt(periodInt); } - /** Adds block to list of blocks */ - synchronized void addBlock(ExtendedBlock block) { + /** Adds block to list of blocks + * @param scanNow - true if we want to make that particular block a high + * priority one to scan immediately + **/ + synchronized void addBlock(ExtendedBlock block, boolean scanNow) { BlockScanInfo info = blockMap.get(block.getLocalBlock()); + long lastScanTime = 0; + if (info != null) { + lastScanTime = info.lastScanTime; + } + // If the particular block is scanned in last 5 minutes, the no need to + // verify that block again + if (scanNow && Time.monotonicNow() - lastScanTime < + lastScanTimeDifference) { + return; + } + if ( info != null ) { LOG.warn("Adding an already existing block " + block); delBlockInfo(info); @@ -295,6 +319,12 @@ class BlockPoolSliceScanner { info = new BlockScanInfo(block.getLocalBlock()); info.lastScanTime = getNewBlockScanTime(); + if (scanNow) { + // Create a new BlockScanInfo object and set the lastScanTime to 0 + // which will make it the high priority block + LOG.info("Adding block for immediate verification " + block); + info.nextScanType = ScanType.IMMEDIATE_SCAN; + } addBlockInfo(info, true); adjustThrottler(); @@ -340,6 +370,7 @@ class BlockPoolSliceScanner { info.lastScanType = type; info.lastScanTime = now; info.lastScanOk = scanOk; + info.nextScanType = ScanType.VERIFICATION_SCAN; addBlockInfo(info, false); // Don't update meta data if the verification failed. @@ -363,6 +394,11 @@ class BlockPoolSliceScanner { } } + @VisibleForTesting + synchronized void setLastScanTimeDifference(int lastScanTimeDifference) { + this.lastScanTimeDifference = lastScanTimeDifference; + } + static private class LogEntry { long blockId = -1; @@ -502,6 +538,9 @@ class BlockPoolSliceScanner { private synchronized boolean isFirstBlockProcessed() { if (!blockInfoSet.isEmpty()) { + if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) { + return false; + } long blockId = blockInfoSet.first().getBlockId(); if ((processedBlocks.get(blockId) != null) && (processedBlocks.get(blockId) == 1)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 182b366..2d312d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -600,6 +600,9 @@ class BlockSender implements java.io.Closeable { String ioem = e.getMessage(); if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); + //Something might be wrong with the block. Make this block the high + //priority block for verification. + datanode.blockScanner.addBlock(block, true); } } throw ioeToSocketException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java index bee3625..450c2b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java @@ -186,10 +186,10 @@ public class DataBlockScanner implements Runnable { new String[blockPoolScannerMap.keySet().size()]); } - public void addBlock(ExtendedBlock block) { + public void addBlock(ExtendedBlock block, boolean scanNow) { BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId()); if (bpScanner != null) { - bpScanner.addBlock(block); + bpScanner.addBlock(block, scanNow); } else { LOG.warn("No block pool scanner found for block pool id: " + block.getBlockPoolId()); @@ -293,6 +293,17 @@ public class DataBlockScanner implements Runnable { } } + @VisibleForTesting + public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) { + BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId()); + if (bpScanner != null) { + bpScanner.setLastScanTimeDifference(lastScanTimeDifference); + } else { + LOG.warn("No block pool scanner found for block pool id: " + + block.getBlockPoolId()); + } + } + public void start() { blockScannerThread = new Thread(this); blockScannerThread.setDaemon(true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 84528e7..12df9d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2170,7 +2170,7 @@ public class DataNode extends ReconfigurableBase } FsVolumeSpi volume = getFSDataset().getVolume(block); if (blockScanner != null && !volume.isTransientStorage()) { - blockScanner.addBlock(block); + blockScanner.addBlock(block, false); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d8cc287..f990faf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -771,7 +771,7 @@ class FsDatasetImpl implements FsDatasetSpi { } // Replace the old block if any to reschedule the scanning. - datanode.getBlockScanner().addBlock(block); + datanode.getBlockScanner().addBlock(block, false); return replicaInfo; } @@ -2035,7 +2035,7 @@ class FsDatasetImpl implements FsDatasetSpi { final DataBlockScanner blockScanner = datanode.getBlockScanner(); if (!vol.isTransientStorage()) { if (blockScanner != null) { - blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); + blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false); } } else { ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java index bf0182b..9e78c10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -28,7 +27,10 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.net.URL; +import java.util.Collections; +import java.util.HashSet; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -42,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; @@ -490,4 +493,59 @@ public class TestDatanodeBlockScanner { cluster.shutdown(); } } + +/** + * This test verifies whether block is added to the first location of + * BlockPoolSliceScanner#blockInfoSet + */ + @Test + public void testAddBlockInfoToFirstLocation() throws Exception { + MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration()) + .numDataNodes(1).build(); + FileSystem fs = null; + try { + fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + // Creating a bunch of blocks + for (int i = 1; i < 10; i++) { + Path fileName = new Path("/test" + i); + DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L); + } + // Get block of the first file created (file1) + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1")); + dataNode.getBlockScanner().setLastScanTimeDifference(block, 0); + // Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can + // scan the first set of blocks + Thread.sleep(10000); + Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime( + dataNode, block); + // Create another set of blocks + for (int i = 10; i < 20; i++) { + Path fileName = new Path("/test" + i); + DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L); + } + dataNode.getBlockScanner().addBlock(block, true); + // Sleep so that BlockPoolSliceScanner can scan the second set of blocks + // and one block which we scheduled to rescan + Thread.sleep(10000); + // Get the lastScanTime of all of the second set of blocks + Set lastScanTimeSet = new HashSet(); + for (int i = 10; i < 20; i++) { + long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode, + DFSTestUtil.getFirstBlock(fs, new Path("/test" + i))); + lastScanTimeSet.add(lastScanTime); + } + Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime( + dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1"))); + Long minimumLastScanTime = Collections.min(lastScanTimeSet); + assertTrue("The second scanTime for test1 block should be greater than " + + "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block); + assertTrue("The second scanTime for test1 block should be less than or" + + " equal to minimum of the lastScanTime of second set of blocks", + scanTime2Fortest1Block <= minimumLastScanTime); + } finally { + IOUtils.closeStream(fs); + cluster.shutdown(); + } + } }