hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r672977 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
Date Tue, 01 Jul 2008 02:35:55 GMT
Author: shv
Date: Mon Jun 30 19:35:54 2008
New Revision: 672977

URL: http://svn.apache.org/viewvc?rev=672977&view=rev
Log:
HADOOP-3649. Merge -r 672975:672976 from trunk to branch 0.18.

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=672977&r1=672976&r2=672977&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Mon Jun 30 19:35:54 2008
@@ -693,6 +693,9 @@
 
     HADOOP-3572. SetQuotas usage interface has some minor bugs. (hairong)
 
+    HADOOP-3649. Fix bug in removing blocks from the corrupted block map.
+    (Lohit Vijayarenu via shv)
+
 Release 0.17.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java?rev=672977&r1=672976&r2=672977&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java
(original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java
Mon Jun 30 19:35:54 2008
@@ -20,7 +20,6 @@
 import org.apache.hadoop.ipc.Server;
 
 import java.util.*;
-import java.io.IOException;
 
 /**
  * Stores information about all corrupt blocks in the File System.
@@ -71,9 +70,6 @@
    * @param blk Block to be removed
    */
   void removeFromCorruptReplicasMap(Block blk) {
-    FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
-    if (fsNamesystem.blocksMap.contains(blk))
-      return;
     if (corruptReplicasMap != null) {
       corruptReplicasMap.remove(blk);
       NameNode.getNameNodeMetrics().numBlocksCorrupted.set(
@@ -103,29 +99,8 @@
     return ((nodes != null) && (nodes.contains(node)));
   }
 
-  /**
-   * Invalidate corrupt replicas
-   *
-   * @param blk Block whose corrupt replicas need to be invalidated
-   */
-  void invalidateCorruptReplicas(Block blk) {
-    FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+  int numCorruptReplicas(Block blk) {
     Collection<DatanodeDescriptor> nodes = getNodes(blk);
-    boolean gotException = false;
-    if (nodes == null)
-      return;
-    for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
-      DatanodeDescriptor node = it.next();
-      try {
-        fsNamesystem.invalidateBlock(blk, node);
-      } catch (IOException e) {
-        NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
-                                      "error in deleting bad block " + blk +
-                                      " on " + node + e);
-      }
-    }
-    // Remove the block from corruptReplicasMap if empty
-    if (!gotException)
-      removeFromCorruptReplicasMap(blk);
+    return (nodes == null) ? 0 : nodes.size();
   }
 }

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java?rev=672977&r1=672976&r2=672977&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java Mon
Jun 30 19:35:54 2008
@@ -1345,8 +1345,12 @@
                                    "block " + blk + " could not be marked " +
                                    "as corrupt as it does not exists in " +
                                    "blocksMap");
-    else 
+    else {
+      // Add this replica to corruptReplicas Map and 
+      // add the block to neededReplication 
       corruptReplicas.addToCorruptReplicasMap(blk, node);
+      updateNeededReplications(blk, 0, 1);
+    }
   }
 
   /**
@@ -2803,7 +2807,8 @@
 
     // filter out containingNodes that are marked for decommission.
     NumberReplicas num = countNodes(storedBlock);
-    int numCurrentReplica = num.liveReplicas()
+    int numLiveReplicas = num.liveReplicas();
+    int numCurrentReplica = numLiveReplicas
       + pendingReplications.getNumReplicas(block);
 
     // check whether safe replication is reached for the block
@@ -2835,12 +2840,46 @@
     // If the file replication has reached desired value
     // we can remove any corrupt replicas the block may have
     int corruptReplicasCount = num.corruptReplicas();
-    if ((corruptReplicasCount > 0) && (numCurrentReplica == fileReplication))
-      corruptReplicas.invalidateCorruptReplicas(block);
+    if ((corruptReplicasCount > 0) && (numLiveReplicas == fileReplication)) 
+      invalidateCorruptReplicas(block);
     return block;
   }
 
   /**
+   * Invalidate corrupt replicas.
+   * <p>
+   * This will remove the replicas from the block's location list,
+   * add them to {@link #recentInvalidateSets} so that they could be further
+   * deleted from the respective data-nodes,
+   * and remove the block from corruptReplicasMap.
+   * <p>
+   * This method should be called when the block has sufficient
+   * number of live replicas.
+   *
+   * @param blk Block whose corrupt replicas need to be invalidated
+   */
+  void invalidateCorruptReplicas(Block blk) {
+    Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
+    boolean gotException = false;
+    if (nodes == null)
+      return;
+    for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
+      DatanodeDescriptor node = it.next();
+      try {
+        invalidateBlock(blk, node);
+      } catch (IOException e) {
+        NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
+                                      "error in deleting bad block " + blk +
+                                      " on " + node + e);
+        gotException = true;
+      }
+    }
+    // Remove the block from corruptReplicasMap
+    if (!gotException)
+      corruptReplicas.removeFromCorruptReplicasMap(blk);
+  }
+
+  /**
    * For each block in the name-node verify whether it belongs to any file,
    * over or under replicated. Place it into the respective queue.
    */
@@ -3067,7 +3106,8 @@
       }
     }
     // If block is removed from blocksMap, remove it from corruptReplicas
-    corruptReplicas.removeFromCorruptReplicasMap(block);
+    if (fileINode == null)
+      corruptReplicas.removeFromCorruptReplicasMap(block);
   }
 
   /**

Modified: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java?rev=672977&r1=672976&r2=672977&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
(original)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
Mon Jun 30 19:35:54 2008
@@ -27,7 +27,6 @@
 import java.util.regex.Pattern;
 import java.io.*;
 import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -142,9 +141,10 @@
     cluster.shutdown();
   }
 
-  void corruptReplica(String blockName, int replica) throws IOException {
+  boolean corruptReplica(String blockName, int replica) throws IOException {
     Random random = new Random();
     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
+    boolean corrupted = false;
     for (int i=replica*2; i<replica*2+2; i++) {
       File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
                                blockName);
@@ -157,8 +157,10 @@
         raFile.seek(rand);
         raFile.write(badString.getBytes());
         raFile.close();
+        corrupted = true;
       }
     }
+    return corrupted;
   }
 
   public void testBlockCorruptionPolicy() throws IOException {
@@ -241,4 +243,143 @@
 
     cluster.shutdown();
   }
+  
+  /**
+   * testBlockCorruptionRecoveryPolicy.
+   * This tests recovery of corrupt replicas, first for one corrupt replica
+   * then for two. The test invokes blockCorruptionRecoveryPolicy which
+   * 1. Creates a block with desired number of replicas
+   * 2. Corrupts the desired number of replicas and restarts the datanodes
+   *    containing the corrupt replica. Additionaly we also read the block
+   *    in case restarting does not report corrupt replicas.
+   *    Restarting or reading from the datanode would trigger reportBadBlocks 
+   *    to namenode.
+   *    NameNode adds it to corruptReplicasMap and neededReplication
+   * 3. Test waits until all corrupt replicas are reported, meanwhile
+   *    Re-replciation brings the block back to healthy state
+   * 4. Test again waits until the block is reported with expected number
+   *    of good replicas.
+   */
+  public void testBlockCorruptionRecoveryPolicy() throws IOException {
+    // Test recovery of 1 corrupt replica
+    LOG.info("Testing corrupt replica recovery for one corrupt replica");
+    blockCorruptionRecoveryPolicy(4, (short)3, 1);
+
+    // Test recovery of 2 corrupt replicas
+    LOG.info("Testing corrupt replica recovery for two corrupt replicas");
+    blockCorruptionRecoveryPolicy(5, (short)3, 2);
+  }
+  
+  private void blockCorruptionRecoveryPolicy(int numDataNodes, 
+                                             short numReplicas,
+                                             int numCorruptReplicas) 
+                                             throws IOException {
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.blockreport.intervalMsec", 30L);
+    conf.setLong("dfs.replication.interval", 30);
+    conf.setLong("dfs.heartbeat.interval", 30L);
+    conf.setBoolean("dfs.replication.considerLoad", false);
+    Random random = new Random();
+    FileSystem fs = null;
+    DFSClient dfsClient = null;
+    LocatedBlocks blocks = null;
+    int replicaCount = 0;
+    int rand = random.nextInt(numDataNodes);
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
+    DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
+    Block blk = DFSTestUtil.getFirstBlock(fs, file1);
+    String block = blk.getBlockName();
+    
+    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
+                                        cluster.getNameNodePort()), conf);
+    blocks = dfsClient.namenode.
+               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    replicaCount = blocks.get(0).getLocations().length;
+
+    // Wait until block is replicated to numReplicas
+    while (replicaCount != numReplicas) {
+      try {
+        LOG.info("Looping until expected replicaCount of " + numReplicas +
+                  "is reached");
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+      }
+      blocks = dfsClient.namenode.
+                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+      replicaCount = blocks.get(0).getLocations().length;
+    }
+    assertTrue(blocks.get(0).isCorrupt() == false);
+
+    // Corrupt numCorruptReplicas replicas of block 
+    int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
+    for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
+      if (corruptReplica(block, i)) 
+        corruptReplicasDNIDs[j++] = i;
+    }
+    
+    // Restart the datanodes containing corrupt replicas 
+    // so they would be reported to namenode and re-replicated
+    for (int i =0; i < numCorruptReplicas; i++) 
+     cluster.restartDataNode(corruptReplicasDNIDs[i]);
+
+    // Loop until all corrupt replicas are reported
+    int corruptReplicaSize = cluster.getNameNode().namesystem.
+                              corruptReplicas.numCorruptReplicas(blk);
+    while (corruptReplicaSize != numCorruptReplicas) {
+      try {
+        IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
+                          conf, true);
+      } catch (IOException e) {
+      }
+      try {
+        LOG.info("Looping until expected " + numCorruptReplicas + " are " +
+                 "reported. Current reported " + corruptReplicaSize);
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+      }
+      corruptReplicaSize = cluster.getNameNode().namesystem.
+                              corruptReplicas.numCorruptReplicas(blk);
+    }
+    
+    // Loop until the block recovers after replication
+    blocks = dfsClient.namenode.
+               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    replicaCount = blocks.get(0).getLocations().length;
+    while (replicaCount != numReplicas) {
+      try {
+        LOG.info("Looping until block gets rereplicated to " + numReplicas);
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+      }
+      blocks = dfsClient.namenode.
+                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+      replicaCount = blocks.get(0).getLocations().length;
+    }
+
+    // Make sure the corrupt replica is invalidated and removed from
+    // corruptReplicasMap
+    corruptReplicaSize = cluster.getNameNode().namesystem.
+                          corruptReplicas.numCorruptReplicas(blk);
+    while (corruptReplicaSize != 0) {
+      try {
+        LOG.info("Looping until corrupt replica is invalidated");
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+      }
+      corruptReplicaSize = cluster.getNameNode().namesystem.
+                            corruptReplicas.numCorruptReplicas(blk);
+      blocks = dfsClient.namenode.
+                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+      replicaCount = blocks.get(0).getLocations().length;
+    }
+    // Make sure block is healthy 
+    assertTrue(corruptReplicaSize == 0);
+    assertTrue(replicaCount == numReplicas);
+    assertTrue(blocks.get(0).isCorrupt() == false);
+    cluster.shutdown();
+  }
 }



Mime
View raw message