hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [08/50] [abbrv] hadoop git commit: HDFS-7548. Corrupt block reporting delayed until datablock scanner thread detects it. Contributed by Rushabh Shah.
Date Tue, 27 Jan 2015 18:40:26 GMT
HDFS-7548. Corrupt block reporting delayed until datablock scanner thread detects it. Contributed
by Rushabh Shah.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0af72c7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0af72c7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0af72c7

Branch: refs/heads/YARN-2928
Commit: c0af72c7f74b6925786e24543cac433b906dd6d3
Parents: 925c9fe
Author: Kihwal Lee <kihwal@apache.org>
Authored: Wed Jan 21 14:41:31 2015 -0600
Committer: Kihwal Lee <kihwal@apache.org>
Committed: Wed Jan 21 14:41:31 2015 -0600

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../server/datanode/BlockPoolSliceScanner.java  | 49 ++++++++++++++--
 .../hdfs/server/datanode/BlockSender.java       |  3 +
 .../hdfs/server/datanode/DataBlockScanner.java  | 15 ++++-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  2 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  4 +-
 .../hadoop/hdfs/TestDatanodeBlockScanner.java   | 60 +++++++++++++++++++-
 7 files changed, 125 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0a301f8..25ad33b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -756,6 +756,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via
     Colin P. McCabe)
 
+    HDFS-7548. Corrupt block reporting delayed until datablock scanner thread
+    detects it (Rushabh Shah via kihwal)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
index 61f1e7e..f36fea1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
@@ -105,6 +105,7 @@ class BlockPoolSliceScanner {
   private long bytesLeft = 0; // Bytes to scan in this period
   private long totalBytesToScan = 0;
   private boolean isNewPeriod = true;
+  private int lastScanTimeDifference = 5*60*1000;
   
   private final LogFileHandler verificationLog;
   
@@ -112,6 +113,7 @@ class BlockPoolSliceScanner {
        200, MAX_SCAN_RATE);
   
   private static enum ScanType {
+    IMMEDIATE_SCAN,  
     VERIFICATION_SCAN,     // scanned as part of periodic verfication
     NONE,
   }
@@ -129,12 +131,17 @@ class BlockPoolSliceScanner {
 
       @Override
       public int compare(BlockScanInfo left, BlockScanInfo right) {
+        final ScanType leftNextScanType = left.nextScanType;
+        final ScanType rightNextScanType = right.nextScanType;
         final long l = left.lastScanTime;
         final long r = right.lastScanTime;
+        // Compare by nextScanType if they are same then compare by 
+        // lastScanTimes
         // compare blocks itself if scantimes are same to avoid.
         // because TreeMap uses comparator if available to check existence of
         // the object. 
-        return l < r? -1: l > r? 1: left.compareTo(right); 
+        int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType);
+        return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1:  l <
r? -1: l > r? 1: left.compareTo(right); 
       }
     };
 
@@ -142,6 +149,7 @@ class BlockPoolSliceScanner {
     ScanType lastScanType = ScanType.NONE; 
     boolean lastScanOk = true;
     private LinkedElement next;
+    ScanType nextScanType = ScanType.VERIFICATION_SCAN;
     
     BlockScanInfo(Block block) {
       super(block);
@@ -265,10 +273,12 @@ class BlockPoolSliceScanner {
   private synchronized void updateBlockInfo(LogEntry e) {
     BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
     
-    if(info != null && e.verificationTime > 0 && 
+    if (info != null && e.verificationTime > 0 && 
         info.lastScanTime < e.verificationTime) {
       delBlockInfo(info);
-      info.lastScanTime = e.verificationTime;
+      if (info.nextScanType != ScanType.IMMEDIATE_SCAN) {
+        info.lastScanTime = e.verificationTime;
+      }
       info.lastScanType = ScanType.VERIFICATION_SCAN;
       addBlockInfo(info, false);
     }
@@ -285,9 +295,23 @@ class BlockPoolSliceScanner {
         DFSUtil.getRandom().nextInt(periodInt);
   }
 
-  /** Adds block to list of blocks */
-  synchronized void addBlock(ExtendedBlock block) {
+  /** Adds block to list of blocks 
+   * @param scanNow - true if we want to make that particular block a high 
+   * priority one to scan immediately
+   **/
+  synchronized void addBlock(ExtendedBlock block, boolean scanNow) {
     BlockScanInfo info = blockMap.get(block.getLocalBlock());
+    long lastScanTime = 0;
+    if (info != null) {
+      lastScanTime = info.lastScanTime;
+    }
+    // If the particular block is scanned in last 5 minutes, the  no need to 
+    // verify that block again
+    if (scanNow && Time.monotonicNow() - lastScanTime < 
+        lastScanTimeDifference) {
+      return;
+    }
+    
     if ( info != null ) {
       LOG.warn("Adding an already existing block " + block);
       delBlockInfo(info);
@@ -295,6 +319,12 @@ class BlockPoolSliceScanner {
     
     info = new BlockScanInfo(block.getLocalBlock());    
     info.lastScanTime = getNewBlockScanTime();
+    if (scanNow) {
+      // Create a new BlockScanInfo object and set the lastScanTime to 0
+      // which will make it the high priority block
+      LOG.info("Adding block for immediate verification " + block);
+      info.nextScanType = ScanType.IMMEDIATE_SCAN;
+    }
     
     addBlockInfo(info, true);
     adjustThrottler();
@@ -340,6 +370,7 @@ class BlockPoolSliceScanner {
     info.lastScanType = type;
     info.lastScanTime = now;
     info.lastScanOk = scanOk;
+    info.nextScanType = ScanType.VERIFICATION_SCAN;
     addBlockInfo(info, false);
         
     // Don't update meta data if the verification failed.
@@ -363,6 +394,11 @@ class BlockPoolSliceScanner {
     }
   }
   
+  @VisibleForTesting
+  synchronized void setLastScanTimeDifference(int lastScanTimeDifference) {
+    this.lastScanTimeDifference = lastScanTimeDifference;
+  }
+  
   static private class LogEntry {
 
     long blockId = -1;
@@ -502,6 +538,9 @@ class BlockPoolSliceScanner {
   
   private synchronized boolean isFirstBlockProcessed() {
     if (!blockInfoSet.isEmpty()) {
+      if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) {
+        return false;
+      }
       long blockId = blockInfoSet.first().getBlockId();
       if ((processedBlocks.get(blockId) != null)
           && (processedBlocks.get(blockId) == 1)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 182b366..2d312d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -600,6 +600,9 @@ class BlockSender implements java.io.Closeable {
         String ioem = e.getMessage();
         if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset"))
{
           LOG.error("BlockSender.sendChunks() exception: ", e);
+          //Something might be wrong with the block. Make this block the high 
+          //priority block for verification.
+          datanode.blockScanner.addBlock(block, true);
         }
       }
       throw ioeToSocketException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
index bee3625..450c2b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
@@ -186,10 +186,10 @@ public class DataBlockScanner implements Runnable {
         new String[blockPoolScannerMap.keySet().size()]);
   }
   
-  public void addBlock(ExtendedBlock block) {
+  public void addBlock(ExtendedBlock block, boolean scanNow) {
     BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
     if (bpScanner != null) {
-      bpScanner.addBlock(block);
+      bpScanner.addBlock(block, scanNow);
     } else {
       LOG.warn("No block pool scanner found for block pool id: "
           + block.getBlockPoolId());
@@ -293,6 +293,17 @@ public class DataBlockScanner implements Runnable {
     }
   }
 
+  @VisibleForTesting
+  public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference)
{
+    BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
+    if (bpScanner != null) {
+      bpScanner.setLastScanTimeDifference(lastScanTimeDifference);
+    } else {
+      LOG.warn("No block pool scanner found for block pool id: "
+          + block.getBlockPoolId());
+    }
+  }
+  
   public void start() {
     blockScannerThread = new Thread(this);
     blockScannerThread.setDaemon(true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 84528e7..12df9d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2170,7 +2170,7 @@ public class DataNode extends ReconfigurableBase
     }
     FsVolumeSpi volume = getFSDataset().getVolume(block);
     if (blockScanner != null && !volume.isTransientStorage()) {
-      blockScanner.addBlock(block);
+      blockScanner.addBlock(block, false);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index d8cc287..f990faf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -771,7 +771,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     // Replace the old block if any to reschedule the scanning.
-    datanode.getBlockScanner().addBlock(block);
+    datanode.getBlockScanner().addBlock(block, false);
     return replicaInfo;
   }
 
@@ -2035,7 +2035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         final DataBlockScanner blockScanner = datanode.getBlockScanner();
         if (!vol.isTransientStorage()) {
           if (blockScanner != null) {
-            blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+            blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false);
           }
         } else {
           ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0af72c7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
index bf0182b..9e78c10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -28,7 +27,10 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -42,6 +44,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@@ -490,4 +493,59 @@ public class TestDatanodeBlockScanner {
       cluster.shutdown();
     }
   }
+  
+/**
+ * This test verifies whether block is added to the first location of 
+ * BlockPoolSliceScanner#blockInfoSet
+ */
+  @Test
+  public void testAddBlockInfoToFirstLocation() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
+        .numDataNodes(1).build();
+    FileSystem fs = null;
+    try {
+      fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+      // Creating a bunch of blocks
+      for (int i = 1; i < 10; i++) {
+        Path fileName = new Path("/test" + i);
+        DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
+      } 
+      // Get block of the first file created (file1)
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1"));
+      dataNode.getBlockScanner().setLastScanTimeDifference(block, 0);
+      // Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can
+      // scan the first set of blocks
+      Thread.sleep(10000);
+      Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime(
+          dataNode, block);
+      // Create another set of blocks
+      for (int i = 10; i < 20; i++) {
+        Path fileName = new Path("/test" + i);
+        DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
+      }
+      dataNode.getBlockScanner().addBlock(block, true);
+      // Sleep so that BlockPoolSliceScanner can scan the second set of blocks
+      // and one block which we scheduled to rescan
+      Thread.sleep(10000);
+      // Get the lastScanTime of all of the second set of blocks
+      Set<Long> lastScanTimeSet = new HashSet<Long>();
+      for (int i = 10; i < 20; i++) {
+        long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode,
+            DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
+        lastScanTimeSet.add(lastScanTime);
+      }
+      Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime(
+          dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1")));
+      Long minimumLastScanTime = Collections.min(lastScanTimeSet);
+      assertTrue("The second scanTime for test1 block should be greater than "
+         + "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block);
+      assertTrue("The second scanTime for test1 block should be less than or"
+         + " equal to minimum of the lastScanTime of second set of blocks",
+          scanTime2Fortest1Block <= minimumLastScanTime);
+    } finally {
+      IOUtils.closeStream(fs);
+      cluster.shutdown();
+    }
+  }
 }


Mime
View raw message