hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r724883 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
Date Tue, 09 Dec 2008 20:55:51 GMT
Author: hairong
Date: Tue Dec  9 12:55:51 2008
New Revision: 724883

URL: http://svn.apache.org/viewvc?rev=724883&view=rev
Log:
HADOOP-4702. Failed block replication leaves an incomplete block in receiver's tmp data directory.
Contributed by Hairong Kuang.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=724883&r1=724882&r2=724883&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Dec  9 12:55:51 2008
@@ -1368,7 +1368,7 @@
     HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
     (szetszwo)
 
-    HADOOP-4659. Root cause of connection failure is being ost to code that
+    HADOOP-4659. Root cause of connection failure is being lost to code that
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
 
     HADOOP-4614. Lazily open segments when merging map spills to avoid using
@@ -1401,6 +1401,9 @@
 
     HADOOP-4742. Replica gets deleted by mistake. (Wang Xu via hairong)
 
+    HADOOP-4702. Failed block replication leaves an incomplete block in
+    receiver's tmp data directory. (hairong)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=724883&r1=724882&r2=724883&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue
Dec  9 12:55:51 2008
@@ -86,11 +86,11 @@
       this.isRecovery = isRecovery;
       this.clientName = clientName;
       this.offsetInBlock = 0;
+      this.srcDataNode = srcDataNode;
+      this.datanode = datanode;
       this.checksum = DataChecksum.newDataChecksum(in);
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
-      this.srcDataNode = srcDataNode;
-      this.datanode = datanode;
       //
       // Open local disk out
       //
@@ -109,11 +109,15 @@
       }
     } catch(IOException ioe) {
       IOUtils.closeStream(this);
+      removeBlock();
+      
+      // check if there is a disk error
       IOException cause = FSDataset.getCauseIfDiskError(ioe);
       if (cause != null) { // possible disk error
         ioe = cause;
-        datanode.checkDiskError(ioe);
+        datanode.checkDiskError(ioe); // may throw an exception here
       }
+      
       throw ioe;
     }
   }
@@ -553,6 +557,7 @@
       if (responder != null) {
         responder.interrupt();
       }
+      removeBlock();
       throw ioe;
     } finally {
       if (responder != null) {
@@ -566,6 +571,15 @@
     }
   }
 
+  /** Remove a partial block 
+   * if this write is for a replication request (and not from a client)
+   */
+  private void removeBlock() throws IOException {
+    if (clientName.length() == 0) { // not client write
+      datanode.data.unfinalizeBlock(block);
+    }
+  }
+
   /**
    * Sets the file pointer in the local block file to the specified value.
    */

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=724883&r1=724882&r2=724883&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Dec
 9 12:55:51 2008
@@ -1164,12 +1164,46 @@
    * Remove the temporary block file (if any)
    */
   public synchronized void unfinalizeBlock(Block b) throws IOException {
-    ongoingCreates.remove(b);
+    // remove the block from in-memory data structure
+    ActiveFile activefile = ongoingCreates.remove(b);
+    if (activefile == null) {
+      return;
+    }
     volumeMap.remove(b);
-    DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    
+    // delete the on-disk temp file
+    if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
+      DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    }
   }
 
   /**
+   * Remove a block from disk
+   * @param blockFile block file
+   * @param metaFile block meta file
+   * @param b a block
+   * @return true if on-disk files are deleted; false otherwise
+   */
+  private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
+    if (blockFile == null) {
+      DataNode.LOG.warn("No file exists for block: " + b);
+      return true;
+    }
+    
+    if (!blockFile.delete()) {
+      DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
+      return false;
+    } else { // remove the meta file
+      if (metaFile != null && !metaFile.delete()) {
+        DataNode.LOG.warn(
+            "Not able to delete the meta block file: " + metaFile);
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  /**
    * Return a table of block data
    */
   public Block[] getBlockReport() {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=724883&r1=724882&r2=724883&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue
Dec  9 12:55:51 2008
@@ -17,17 +17,24 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.DataOutputStream;
 import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.io.Text;
 
 import junit.framework.TestCase;
 
-/** Test if a datanode can handle disk error correctly*/
+/** Test if a datanode can correctly handle errors during block read/write*/
 public class TestDiskError extends TestCase {
   public void testShutdown() throws Exception {
     // bring up a cluster of 3
@@ -52,6 +59,7 @@
         Path fileName = new Path("/test.txt"+i);
         DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
         DFSTestUtil.waitReplication(fs, fileName, (short)2);
+        fs.delete(fileName, true);
       }
     } finally {
       // restore its old permission
@@ -60,4 +68,76 @@
       cluster.shutdown();
     }
   }
+  
+  public void testReplicationError() throws Exception {
+    // bring up a cluster of 1
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    
+    try {
+      // create a file of replication factor of 1
+      final Path fileName = new Path("/test.txt");
+      final int fileLen = 1;
+      DFSTestUtil.createFile(fs, fileName, 1, (short)1, 1L);
+      DFSTestUtil.waitReplication(fs, fileName, (short)1);
+
+      // get the block belonged to the created file
+      LocatedBlocks blocks = cluster.getNameNode().namesystem.getBlockLocations(
+          fileName.toString(), 0, (long)fileLen);
+      assertEquals(blocks.locatedBlockCount(), 1);
+      LocatedBlock block = blocks.get(0);
+      
+      // bring up a second datanode
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.waitActive();
+      final int sndNode = 1;
+      DataNode datanode = cluster.getDataNodes().get(sndNode);
+      
+      // replicate the block to the second datanode
+      InetSocketAddress target = datanode.getSelfAddr();
+      Socket s = new Socket(target.getAddress(), target.getPort());
+        //write the header.
+      DataOutputStream out = new DataOutputStream(
+          s.getOutputStream());
+
+      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
+      out.write( DataTransferProtocol.OP_WRITE_BLOCK );
+      out.writeLong( block.getBlock().getBlockId());
+      out.writeLong( block.getBlock().getGenerationStamp() );
+      out.writeInt(1);
+      out.writeBoolean( false );       // recovery flag
+      Text.writeString( out, "" );
+      out.writeBoolean(false); // Not sending src node information
+      out.writeInt(0);
+      
+      // write check header
+      out.writeByte( 1 );
+      out.writeInt( 512 );
+
+      out.flush();
+
+      // close the connection before sending the content of the block
+      out.close();
+      
+      // the temporary block & meta files should be deleted
+      String dataDir = cluster.getDataDirectory();
+      File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "tmp");
+      File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "tmp");
+      while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
+        Thread.sleep(100);
+      }
+      
+      // then increase the file's replication factor
+      fs.setReplication(fileName, (short)2);
+      // replication should succeed
+      DFSTestUtil.waitReplication(fs, fileName, (short)1);
+      
+      // clean up the file
+      fs.delete(fileName, false);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }



Mime
View raw message