hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rang...@apache.org
Subject svn commit: r655660 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Mon, 12 May 2008 21:52:03 GMT
Author: rangadi
Date: Mon May 12 14:52:02 2008
New Revision: 655660

URL: http://svn.apache.org/viewvc?rev=655660&view=rev
Log:
HADOOP-2065. Delay invalidating corrupt replicas of block until its
is removed from under replicated state. If all replicas are found to
be corrupt, retain all copies and mark the block as corrupt.
(Lohit Vjayarenu via rangadi)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 12 14:52:02 2008
@@ -74,6 +74,11 @@
     HADOOP-1915. Allow users to specify counters via strings instead
     of enumerations. (tomwhite via omalley)
 
+    HADOOP-2065. Delay invalidating corrupt replicas of block until its 
+    is removed from under replicated state. If all replicas are found to 
+    be corrupt, retain all copies and mark the block as corrupt.
+    (Lohit Vjayarenu via rangadi)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java Mon May 12 14:52:02 2008
@@ -383,4 +383,10 @@
   Collection<BlockInfo> getBlocks() {
     return map.values();
   }
+  /**
+   * Check if the block exists in map
+   */
+  boolean contains(Block block) {
+    return map.containsKey(block);
+  }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Mon May 12 14:52:02
2008
@@ -37,9 +37,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 31: changed the serialization in DatanodeRegistration and DatanodeInfo
+   * 32: add corrupt field to LocatedBlock
    */
-  public static final long versionID = 31L;
+  public static final long versionID = 32L;
   
   ///////////////////////////////////////
   // File contents

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon May 12 14:52:02
2008
@@ -31,9 +31,9 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 13: changed the serialization in DatanodeRegistration and DatanodeInfo
+   * 14: add corrupt field to LocatedBlock
    */
-  public static final long versionID = 13L;
+  public static final long versionID = 14L;
   
   // error code
   final static int NOTIFY = 0;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Mon May 12 14:52:02
2008
@@ -308,6 +308,8 @@
       // modify file-> block and blocksMap
       fileNode.removeBlock(block);
       namesystem.blocksMap.removeINode(block);
+      // If block is removed from blocksMap remove it from corruptReplicasMap
+      namesystem.corruptReplicas.removeFromCorruptReplicasMap(block);
 
       // write modified block locations to log
       fsImage.getEditLog().logOpenFile(path, fileNode);
@@ -546,6 +548,8 @@
           totalInodes -= filesRemoved;
           for (Block b : v) {
             namesystem.blocksMap.removeINode(b);
+            // If block is removed from blocksMap remove it from corruptReplicasMap
+            namesystem.corruptReplicas.removeFromCorruptReplicasMap(b);
             if (deletedBlocks != null) {
               deletedBlocks.add(b);
             }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon May 12 14:52:02
2008
@@ -91,6 +91,11 @@
   // Mapping: Block -> { INode, datanodes, self ref } 
   //
   BlocksMap blocksMap = new BlocksMap();
+
+  //
+  // Store blocks-->datanodedescriptor(s) map of corrupt replicas
+  //
+  CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
     
   /**
    * Stores the datanode -> block map.  
@@ -718,15 +723,26 @@
     do {
       // get block locations
       int numNodes = blocksMap.numNodes(blocks[curBlk]);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numNodes];
-      if (numNodes > 0) {
+      Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(
+                                                      blocks[curBlk]);
+      int numCorruptNodes = (nodesCorrupt == null) ? 0 : nodesCorrupt.size();
+      boolean blockCorrupt = (numCorruptNodes == numNodes);
+      int numMachineSet = blockCorrupt ? numNodes : 
+                            (numNodes - numCorruptNodes);
+      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
+      if (numMachineSet > 0) {
         numNodes = 0;
         for(Iterator<DatanodeDescriptor> it = 
             blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          machineSet[numNodes++] = it.next();
+          DatanodeDescriptor dn = it.next();
+          boolean replicaCorrupt = ((nodesCorrupt != null) &&
+                                    nodesCorrupt.contains(dn));
+          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
+            machineSet[numNodes++] = dn;
         }
       }
-      results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos));
+      results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
+                  blockCorrupt));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -1263,6 +1279,22 @@
   }
 
   /**
+   * Mark the block belonging to datanode as corrupt
+   * @param blk Block to be marked as corrupt
+   * @param datanode Datanode which holds the corrupt replica
+   */
+  public synchronized void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
+    throws IOException {
+    DatanodeDescriptor node = getDatanode(dn);
+    if (node == null) {
+      throw new IOException("Cannot mark block" + blk.getBlockName() +
+                            " as corrupt because datanode " + dn.getName() +
+                            " does not exist. ");
+    }
+    corruptReplicas.addToCorruptReplicasMap(blk, node);
+  }
+
+  /**
    * Invalidates the given block on the given datanode.
    */
   public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
@@ -2138,14 +2170,22 @@
     DatanodeDescriptor srcNode = null;
     int live = 0;
     int decommissioned = 0;
+    int corrupt = 0;
     Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
     while(it.hasNext()) {
       DatanodeDescriptor node = it.next();
-      if(!node.isDecommissionInProgress() && !node.isDecommissioned())
+      Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(block);
+      if ((nodes != null) && (nodes.contains(node)))
+        corrupt++;
+      else if(!node.isDecommissionInProgress() && !node.isDecommissioned())
         live++;
       else
         decommissioned++;
       containingNodes.add(node);
+      // Check if this replica is corrupt
+      // If so, do not select the node as src node
+      if ((nodes != null) && nodes.contains(node))
+        continue;
       if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
         continue; // already reached replication limit
       // the block must not be scheduled for removal on srcNode
@@ -2170,7 +2210,7 @@
         srcNode = node;
     }
     if(numReplicas != null)
-      numReplicas.initialize(live, decommissioned);
+      numReplicas.initialize(live, decommissioned, corrupt);
     return srcNode;
   }
 
@@ -2598,6 +2638,11 @@
     if (numCurrentReplica > fileReplication) {
       proccessOverReplicatedBlock(block, fileReplication, node, delNodeHint);
     }
+    // If the file replication has reached desired value
+    // we can remove any corrupt replicas the block may have
+    int corruptReplicasCount = num.corruptReplicas();
+    if ((corruptReplicasCount > 0) && (numCurrentReplica == fileReplication))
+      corruptReplicas.invalidateCorruptReplicas(block);
     return block;
   }
 
@@ -2827,6 +2872,8 @@
         excessReplicateMap.remove(node.getStorageID());
       }
     }
+    // If block is removed from blocksMap, remove it from corruptReplicas
+    corruptReplicas.removeFromCorruptReplicasMap(block);
   }
 
   /**
@@ -3079,18 +3126,20 @@
   private static class NumberReplicas {
     private int liveReplicas;
     private int decommissionedReplicas;
+    private int corruptReplicas;
 
     NumberReplicas() {
-      initialize(0, 0);
+      initialize(0, 0, 0);
     }
 
-    NumberReplicas(int live, int decommissioned) {
-      initialize(live, decommissioned);
+    NumberReplicas(int live, int decommissioned, int corrupt) {
+      initialize(live, decommissioned, corrupt);
     }
 
-    void initialize(int live, int decommissioned) {
+    void initialize(int live, int decommissioned, int corrupt) {
       liveReplicas = live;
       decommissionedReplicas = decommissioned;
+      corruptReplicas = corrupt;
     }
 
     int liveReplicas() {
@@ -3099,32 +3148,41 @@
     int decommissionedReplicas() {
       return decommissionedReplicas;
     }
+    int corruptReplicas() {
+      return corruptReplicas;
+    }
   } 
 
   /**
    * Counts the number of nodes in the given list into active and
    * decommissioned counters.
    */
-  private NumberReplicas countNodes(Iterator<DatanodeDescriptor> nodeIter) {
+  private NumberReplicas countNodes(Block b,
+                                    Iterator<DatanodeDescriptor> nodeIter) {
     int count = 0;
     int live = 0;
+    int corrupt = 0;
+    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
     while ( nodeIter.hasNext() ) {
       DatanodeDescriptor node = nodeIter.next();
-      if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
+        corrupt++;
+      }
+      else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         count++;
       }
       else {
         live++;
       }
     }
-    return new NumberReplicas(live, count);
+    return new NumberReplicas(live, count, corrupt);
   }
 
   /**
    * Return the number of nodes that are live and decommissioned.
    */
   private NumberReplicas countNodes(Block b) {
-    return countNodes(blocksMap.nodeIterator(b));
+    return countNodes(b, blocksMap.nodeIterator(b));
   }
 
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java Mon May 12 14:52:02
2008
@@ -39,24 +39,36 @@
   private Block b;
   private long offset;  // offset of the first byte of the block in the file
   private DatanodeInfo[] locs;
+  // corrupt flag is true if all of the replicas of a block are corrupt.
+  // else false. If block has few corrupt replicas, they are filtered and 
+  // their locations are not part of this object
+  private boolean corrupt;
 
   /**
    */
   public LocatedBlock() {
-    this(new Block(), new DatanodeInfo[0], 0L);
+    this(new Block(), new DatanodeInfo[0], 0L, false);
   }
 
   /**
    */
   public LocatedBlock(Block b, DatanodeInfo[] locs) {
-    this(b, locs, -1); // startOffset is unknown
+    this(b, locs, -1, false); // startOffset is unknown
   }
 
   /**
    */
   public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset) {
+    this(b, locs, startOffset, false);
+  }
+
+  /**
+   */
+  public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset, 
+                      boolean corrupt) {
     this.b = b;
     this.offset = startOffset;
+    this.corrupt = corrupt;
     if (locs==null) {
       this.locs = new DatanodeInfo[0];
     } else {
@@ -88,10 +100,19 @@
     this.offset = value;
   }
 
+  void setCorrupt(boolean corrupt) {
+    this.corrupt = corrupt;
+  }
+  
+  boolean isCorrupt() {
+    return this.corrupt;
+  }
+
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
+    out.writeBoolean(corrupt);
     out.writeLong(offset);
     b.write(out);
     out.writeInt(locs.length);
@@ -101,6 +122,7 @@
   }
 
   public void readFields(DataInput in) throws IOException {
+    this.corrupt = in.readBoolean();
     offset = in.readLong();
     this.b = new Block();
     b.readFields(in);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon May 12 14:52:02 2008
@@ -351,7 +351,7 @@
   /**
    * The client has detected an error on the specified located blocks 
    * and is reporting them to the server.  For now, the namenode will 
-   * delete the blocks from the datanodes.  In the future we might 
+   * mark the block as corrupt.  In the future we might 
    * check the blocks are actually corrupt. 
    */
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
@@ -361,7 +361,7 @@
       DatanodeInfo[] nodes = blocks[i].getLocations();
       for (int j = 0; j < nodes.length; j++) {
         DatanodeInfo dn = nodes[j];
-        namesystem.invalidateBlock(blk, dn);
+        namesystem.markBlockAsCorrupt(blk, dn);
       }
     }
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java?rev=655660&r1=655659&r2=655660&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java Mon May
12 14:52:02 2008
@@ -25,6 +25,10 @@
 import java.net.URLConnection;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -137,4 +141,88 @@
     
     cluster.shutdown();
   }
+
+  void corruptReplica(String blockName, int replica) throws IOException {
+    Random random = new Random();
+    File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
+    for (int i=replica*2; i<replica*2+2; i++) {
+      File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
+                               blockName);
+      if (blockFile.exists()) {
+        // Corrupt replica by writing random bytes into replica
+        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+        FileChannel channel = raFile.getChannel();
+        String badString = "BADBAD";
+        int rand = random.nextInt((int)channel.size()/2);
+        raFile.seek(rand);
+        raFile.write(badString.getBytes());
+        raFile.close();
+      }
+    }
+  }
+
+  public void testBlockCorruptionPolicy() throws IOException {
+    Configuration conf = new Configuration();
+    Random random = new Random();
+    FileSystem fs = null;
+    DFSClient dfsClient = null;
+    LocatedBlocks blocks = null;
+    int blockCount = 0;
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    Path file1 = new Path("/tmp/testBlockVerification/file1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
+    String block = DFSTestUtil.getFirstBlock(fs, file1).toString();
+    
+    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
+                                        cluster.getNameNodePort()), conf);
+    blocks = dfsClient.namenode.
+                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    blockCount = blocks.get(0).getLocations().length;
+    assertTrue(blockCount == 3);
+    assertTrue(blocks.get(0).isCorrupt() == false);
+
+    // Corrupt random replica of block 
+    corruptReplica(block, random.nextInt(3));
+    cluster.shutdown();
+
+    // Restart the cluster hoping the corrupt block to be reported
+    // We have 2 good replicas and block is not corrupt
+    cluster = new MiniDFSCluster(conf, 3, false, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
+                                        cluster.getNameNodePort()), conf);
+    blocks = dfsClient.namenode.
+                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    blockCount = blocks.get(0).getLocations().length;
+    assertTrue (blockCount == 2);
+    assertTrue(blocks.get(0).isCorrupt() == false);
+  
+    // Corrupt all replicas. Now, block should be marked as corrupt
+    // and we should get all the replicas 
+    corruptReplica(block, 0);
+    corruptReplica(block, 1);
+    corruptReplica(block, 2);
+
+    // Read the file to trigger reportBadBlocks by client
+    try {
+      IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
+                        conf, true);
+    } catch (IOException e) {
+      // Ignore exception
+    }
+
+    // We now have he blocks to be marked as corrup and we get back all
+    // its replicas
+    blocks = dfsClient.namenode.
+                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    blockCount = blocks.get(0).getLocations().length;
+    assertTrue (blockCount == 3);
+    assertTrue(blocks.get(0).isCorrupt() == true);
+
+    cluster.shutdown();
+  }
 }



Mime
View raw message