hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rang...@apache.org
Subject svn commit: r659235 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Thu, 22 May 2008 20:12:31 GMT
Author: rangadi
Date: Thu May 22 13:12:30 2008
New Revision: 659235

URL: http://svn.apache.org/viewvc?rev=659235&view=rev
Log:
HADOOP-3035. During block transfers between datanodes, the receiving
datanode, now can report corrupt replicas received from src node to 
the namenode. (Lohit Vijayarenu via rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 22 13:12:30 2008
@@ -166,6 +166,10 @@
     that it does not block user. DataNode misses heartbeats in large
     nodes otherwise. (Johan Oskarsson via rangadi)
 
+    HADOOP-3035. During block transfers between datanodes, the receiving
+    datanode, now can report corrupt replicas received from src node to
+    the namenode. (Lohit Vijayarenu via rangadi)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu May 22 13:12:30 2008
@@ -2299,6 +2299,7 @@
         out.writeInt( nodes.length );
         out.writeBoolean( recoveryFlag );       // recovery flag
         Text.writeString( out, client );
+        out.writeBoolean(false); // Not sending src node information
         out.writeInt( nodes.length - 1 );
         for (int i = 1; i < nodes.length; i++) {
           nodes[i].write(out);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu May 22 13:12:30 2008
@@ -1126,6 +1126,7 @@
      */
     private void writeBlock(DataInputStream in) throws IOException {
       xceiverCount.incr();
+      DatanodeInfo srcDataNode = null;
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
                 " tcp no delay " + s.getTcpNoDelay());
       //
@@ -1138,6 +1139,11 @@
       int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       boolean isRecovery = in.readBoolean(); // is this part of recovery?
       String client = Text.readString(in); // working on behalf of this client
+      boolean hasSrcDataNode = in.readBoolean(); // is src node info present
+      if (hasSrcDataNode) {
+        srcDataNode = new DatanodeInfo();
+        srcDataNode.readFields(in);
+      }
       int numTargets = in.readInt();
       if (numTargets < 0) {
         throw new IOException("Mislabelled incoming datastream.");
@@ -1159,7 +1165,7 @@
       try {
         // open a block receiver and check if the block does not exist
         blockReceiver = new BlockReceiver(block, in, 
-            s.getInetAddress().toString(), isRecovery, client);
+            s.getInetAddress().toString(), isRecovery, client, srcDataNode);
 
         // get a connection back to the previous target
         replyOut = new DataOutputStream(
@@ -1196,6 +1202,10 @@
             mirrorOut.writeInt( pipelineSize );
             mirrorOut.writeBoolean( isRecovery );
             Text.writeString( mirrorOut, client );
+            mirrorOut.writeBoolean(hasSrcDataNode);
+            if (hasSrcDataNode) { // pass src node information
+              srcDataNode.write(mirrorOut);
+            }
             mirrorOut.writeInt( targets.length - 1 );
             for ( int i = 1; i < targets.length; i++ ) {
               targets[i].write( mirrorOut );
@@ -1419,7 +1429,7 @@
       try {
         // open a block receiver and check if the block does not exist
          blockReceiver = new BlockReceiver(
-            block, in, s.getRemoteSocketAddress().toString(), false, "");
+            block, in, s.getRemoteSocketAddress().toString(), false, "", null);
 
         // receive a block
         blockReceiver.receiveBlock(null, null, null, null, balancingThrottler, -1);
@@ -2250,10 +2260,11 @@
     private FSDataset.BlockWriteStreams streams;
     private boolean isRecovery = false;
     private String clientName;
+    DatanodeInfo srcDataNode = null;
 
     BlockReceiver(Block block, DataInputStream in, String inAddr,
-                  boolean isRecovery, String clientName)
-        throws IOException {
+                  boolean isRecovery, String clientName, 
+                  DatanodeInfo srcDataNode) throws IOException {
       try{
         this.block = block;
         this.in = in;
@@ -2264,6 +2275,7 @@
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
+        this.srcDataNode = srcDataNode;
         //
         // Open local disk out
         //
@@ -2352,6 +2364,18 @@
         checksum.update(dataBuf, dataOff, chunkLen);
 
         if (!checksum.compare(checksumBuf, checksumOff)) {
+          if (srcDataNode != null) {
+            try {
+              LOG.info("report corrupt block " + block + " from datanode " +
+                        srcDataNode + " to namenode");
+              LocatedBlock lb = new LocatedBlock(block, 
+                                              new DatanodeInfo[] {srcDataNode});
+              namenode.reportBadBlocks(new LocatedBlock[] {lb});
+            } catch (IOException e) {
+              LOG.warn("Failed to report bad block " + block + 
+                        " from datanode " + srcDataNode + " to namenode");
+            }
+          }
           throw new IOException("Unexpected checksum mismatch " + 
                                 "while writing " + block + " from " + inAddr);
         }
@@ -2770,6 +2794,7 @@
                                                             SMALL_BUFFER_SIZE));
 
         blockSender = new BlockSender(b, 0, -1, false, false, false);
+        DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
 
         //
         // Header info
@@ -2781,6 +2806,8 @@
         out.writeInt(0);           // no pipelining
         out.writeBoolean(false);   // not part of recovery
         Text.writeString(out, ""); // client
+        out.writeBoolean(true); // sending src node information
+        srcNode.write(out); // Write src node DatanodeInfo
         // write targets
         out.writeInt(targets.length - 1);
         for (int i = 1; i < targets.length; i++) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Thu May 22 13:12:30
2008
@@ -101,11 +101,11 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 10:
-   *    DFSClient also sends non-interleaved checksum and data while writing
-   *    to DFS.
+   * Version 11:
+   *    OP_WRITE_BLOCK sends a boolean. If its value is true, an additonal 
+   *    DatanodeInfo of client requesting transfer is also sent. 
    */
-  public static final int DATA_TRANSFER_VERSION = 10;
+  public static final int DATA_TRANSFER_VERSION = 11;
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu May 22 13:12:30
2008
@@ -22,6 +22,10 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.io.RandomAccessFile;
 
 import javax.security.auth.login.LoginException;
 
@@ -547,6 +551,41 @@
   }
 
   /*
+   * Corrupt a block on all datanode
+   */
+  void corruptBlockOnDataNodes(String blockName) throws Exception{
+    for (int i=0; i < dataNodes.size(); i++)
+      corruptBlockOnDataNode(i,blockName);
+  }
+
+  /*
+   * Corrupt a block on a particular datanode
+   */
+  boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
+    Random random = new Random();
+    boolean corrupted = false;
+    File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
+    if (i < 0 || i >= dataNodes.size())
+      return false;
+    for (int dn = i*2; dn < i*2+2; dn++) {
+      File blockFile = new File(baseDir, "data" + (dn+1) + "/current/" +
+                                blockName);
+      if (blockFile.exists()) {
+        // Corrupt replica by writing random bytes into replica
+        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+        FileChannel channel = raFile.getChannel();
+        String badString = "BADBAD";
+        int rand = random.nextInt((int)channel.size()/2);
+        raFile.seek(rand);
+        raFile.write(badString.getBytes());
+        raFile.close();
+      }
+      corrupted = true;
+    }
+    return corrupted;
+  }
+
+  /*
    * Shutdown a particular datanode
    */
   boolean stopDataNode(int i) {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Thu May
22 13:12:30 2008
@@ -171,6 +171,7 @@
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeBoolean(false);   // recoveryFlag
     Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);           // number of downstream targets
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
@@ -189,6 +190,7 @@
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeBoolean(false);   // recoveryFlag
     Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeBoolean(false); // no src node info
 
     // bad number of targets
     sendOut.writeInt(-1-random.nextInt(oneMil));
@@ -204,6 +206,7 @@
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeBoolean(false);   // recoveryFlag
     Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);
@@ -230,6 +233,7 @@
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeBoolean(false);   // recoveryFlag
     Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);    // checksum size

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Thu May 22 13:12:30
2008
@@ -126,6 +126,51 @@
     fileSys.delete(name, true);
     assertTrue(!fileSys.exists(name));
   }
+
+  /* 
+   * Test if Datanode reports bad blocks during replication request
+   */
+  public void testBadBlockReportOnTransfer() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = null;
+    DFSClient dfsClient = null;
+    LocatedBlocks blocks = null;
+    int replicaCount = 0;
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                              cluster.getNameNodePort()), conf);
+  
+    // Create file with replication factor of 1
+    Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
+    DFSTestUtil.waitReplication(fs, file1, (short)1);
+  
+    // Corrupt the block belonging to the created file
+    String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
+    cluster.corruptBlockOnDataNodes(block);
+  
+    // Increase replication factor, this should invoke transfer request
+    // Receiving datanode fails on checksum and reports it to namenode
+    fs.setReplication(file1, (short)2);
+  
+    // Now get block details and check if the block is corrupt
+    blocks = dfsClient.namenode.
+              getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    while (blocks.get(0).isCorrupt() != true) {
+      try {
+        LOG.info("Waiting until block is marked as corrupt...");
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+      }
+      blocks = dfsClient.namenode.
+                getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    }
+    replicaCount = blocks.get(0).getLocations().length;
+    assertTrue(replicaCount == 1);
+    cluster.shutdown();
+  }
   
   /**
    * Tests replication in DFS.
@@ -330,4 +375,5 @@
       }
     }
   }  
+  
 }



Mime
View raw message