hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r926471 - in /hadoop/hdfs/branches/branch-0.21: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
Date Tue, 23 Mar 2010 06:07:20 GMT
Author: hairong
Date: Tue Mar 23 06:07:19 2010
New Revision: 926471

URL: http://svn.apache.org/viewvc?rev=926471&view=rev
Log:
Merge -c 926469 to move the change of HDFS-520 from trunk to 0.21 branch.

Modified:
    hadoop/hdfs/branches/branch-0.21/CHANGES.txt
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=926471&r1=926470&r2=926471&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Tue Mar 23 06:07:19 2010
@@ -329,6 +329,8 @@ Trunk (unreleased changes)
     HDFS-127. Reset failure count in DFSClient for each block acquiring
     operation.  (Igor Bolotin via szetszwo)
 
+    HDFS-520. Create new tests for block recovery. (hairong)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=926471&r1=926470&r2=926471&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Mar
23 06:07:19 2010
@@ -130,7 +130,7 @@ public class DFSClient implements FSCons
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  private final ClientProtocol namenode;
+  final ClientProtocol namenode;
   private final ClientProtocol rpcNamenode;
   final UnixUserGroupInformation ugi;
   volatile boolean clientRunning = true;
@@ -360,7 +360,7 @@ public class DFSClient implements FSCons
     return defaultReplication;
   }
 
-  private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
+  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
       String src, long start, long length) throws IOException {
     try {
       return namenode.getBlockLocations(src, start, length);

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=926471&r1=926470&r2=926471&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Tue Mar 23 06:07:19 2010
@@ -229,10 +229,23 @@ public class DataNode extends Configured
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
+    this(conf, dataDirs, (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+                       DatanodeProtocol.versionID,
+                       NameNode.getAddress(conf),
+                       conf));
+  }
+
+  /**
+   * Create the DataNode given a configuration, an array of dataDirs,
+   * and a namenode proxy
+   */
+  DataNode(final Configuration conf,
+           final AbstractList<File> dataDirs,
+           final DatanodeProtocol namenode) throws IOException {
     super(conf);
     DataNode.setDataNode(this);
     try {
-      startDataNode(conf, dataDirs);
+      startDataNode(conf, dataDirs, namenode);
     } catch (IOException ie) {
       shutdown();
       throw ie;
@@ -251,7 +264,8 @@ public class DataNode extends Configured
    * @throws IOException
    */
   void startDataNode(Configuration conf, 
-                     AbstractList<File> dataDirs
+                     AbstractList<File> dataDirs,
+                     DatanodeProtocol namenode
                      ) throws IOException {
     // use configured nameserver & interface to get local hostname
     if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
@@ -282,11 +296,8 @@ public class DataNode extends Configured
     this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
 
     // connect to name node
-    this.namenode = (DatanodeProtocol) 
-      RPC.waitForProxy(DatanodeProtocol.class,
-                       DatanodeProtocol.versionID,
-                       nameNodeAddr, 
-                       conf);
+    this.namenode = namenode;
+    
     // get version and id info from the name-node
     NamespaceInfo nsInfo = handshake();
     StartupOption startOpt = getStartupOption(conf);
@@ -1585,7 +1596,7 @@ public class DataNode extends Configured
   }
 
   /** A convenient class used in block recovery */
-  private static class BlockRecord { 
+  static class BlockRecord { 
     final DatanodeID id;
     final InterDatanodeProtocol datanode;
     final ReplicaRecoveryInfo rInfo;
@@ -1646,7 +1657,7 @@ public class DataNode extends Configured
   }
 
   /** Block synchronization */
-  private void syncBlock(RecoveringBlock rBlock,
+  void syncBlock(RecoveringBlock rBlock,
                          List<BlockRecord> syncList) throws IOException {
     Block block = rBlock.getBlock();
     long recoveryId = rBlock.getNewGenerationStamp();

Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=926471&r1=926470&r2=926471&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
Tue Mar 23 06:07:19 2010
@@ -17,137 +17,218 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
-public class TestLeaseRecovery2 extends junit.framework.TestCase {
+public class TestLeaseRecovery2 {
   {
     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
   }
 
-  static final long BLOCK_SIZE = 1024;
-  static final int FILE_SIZE = 1024*16;
+  static final private long BLOCK_SIZE = 1024;
+  static final private int FILE_SIZE = (int)BLOCK_SIZE*2;
   static final short REPLICATION_NUM = (short)3;
   static byte[] buffer = new byte[FILE_SIZE];
 
-  public void testBlockSynchronization() throws Exception {
-    final long softLease = 1000;
-    final long hardLease = 60 * 60 *1000;
-    final short repl = 3;
-    final Configuration conf = new HdfsConfiguration();
-    final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+  static private MiniDFSCluster cluster;
+  static private DistributedFileSystem dfs;
+  final static private Configuration conf = new HdfsConfiguration();
+  final static private int BUF_SIZE = conf.getInt("io.file.buffer.size", 4096);
+ 
+  final static private long SHORT_LEASE_PERIOD = 1000L;
+  final static private long LONG_LEASE_PERIOD = 60*60*SHORT_LEASE_PERIOD;
+ 
+  /** start a dfs cluster
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void startUp() throws IOException {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setInt("dfs.heartbeat.interval", 1);
-  //  conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 16);
 
-    MiniDFSCluster cluster = null;
-    DistributedFileSystem dfs = null;
-    byte[] actual = new byte[FILE_SIZE];
+    cluster = new MiniDFSCluster(conf, 5, true, null);
+    cluster.waitActive();
+    dfs = (DistributedFileSystem)cluster.getFileSystem();
+  }
+
+  /**
+   * stop the cluster
+   * @throws IOException
+   */
+  @AfterClass
+  public static void tearDown() throws IOException {
+    IOUtils.closeStream(dfs);
+    if (cluster != null) {cluster.shutdown();}
+  }
+
+  /**
+   * This test makes the client does not renew its lease and also
+   * set the hard lease expiration period to be short 1s. Thus triggering
+   * lease expiration to happen while the client is still alive.
+   *
+   * The test makes sure that the lease recovery completes and the client
+   * fails if it continues to write to the file.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testHardLeaseRecovery() throws Exception {
+    //create a file
+    String filestr = "/hardLeaseRecovery";
+    AppendTestUtil.LOG.info("filestr=" + filestr);
+    Path filepath = new Path(filestr);
+    FSDataOutputStream stm = dfs.create(filepath, true,
+        BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+    assertTrue(dfs.dfs.exists(filestr));
+
+    // write bytes into the file.
+    int size = AppendTestUtil.nextInt(FILE_SIZE);
+    AppendTestUtil.LOG.info("size=" + size);
+    stm.write(buffer, 0, size);
+
+    // hflush file
+    AppendTestUtil.LOG.info("hflush");
+    stm.hflush();
+
+    // kill the lease renewal thread
+    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+    dfs.dfs.leasechecker.interruptAndJoin();
+
+    // set the hard limit to be 1 second
+    cluster.setLeasePeriod(LONG_LEASE_PERIOD, SHORT_LEASE_PERIOD);
+
+    // wait for lease recovery to complete
+    LocatedBlocks locatedBlocks;
+    do {
+      Thread.sleep(SHORT_LEASE_PERIOD);
+      locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode,
+        filestr, 0L, size);
+    } while (locatedBlocks.isUnderConstruction());
+    assertEquals(size, locatedBlocks.getFileLength());
 
+    // make sure that the writer thread gets killed
     try {
-      cluster = new MiniDFSCluster(conf, 5, true, null);
-      cluster.waitActive();
+      stm.write('b');
+      stm.close();
+      fail("Writer thread should have been killed");
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // verify data
+    AppendTestUtil.LOG.info(
+        "File size is good. Now validating sizes from datanodes...");
+    AppendTestUtil.checkFullFile(dfs, filepath, size, buffer, filestr);
+  }
 
-      //create a file
-      dfs = (DistributedFileSystem)cluster.getFileSystem();
-      // create a random file name
-      String filestr = "/foo" + AppendTestUtil.nextInt();
-      System.out.println("filestr=" + filestr);
-      Path filepath = new Path(filestr);
-      FSDataOutputStream stm = dfs.create(filepath, true,
-          bufferSize, repl, BLOCK_SIZE);
-      assertTrue(dfs.dfs.exists(filestr));
-
-      // write random number of bytes into it.
-      int size = AppendTestUtil.nextInt(FILE_SIZE);
-      System.out.println("size=" + size);
-      stm.write(buffer, 0, size);
-
-      // hflush file
-      AppendTestUtil.LOG.info("hflush");
-      stm.hflush();
-      AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
-      dfs.dfs.leasechecker.interruptAndJoin();
-
-      // set the soft limit to be 1 second so that the
-      // namenode triggers lease recovery on next attempt to write-for-open.
-      cluster.setLeasePeriod(softLease, hardLease);
-
-      // try to re-open the file before closing the previous handle. This
-      // should fail but will trigger lease recovery.
-      {
-        Configuration conf2 = new HdfsConfiguration(conf);
-        String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
-        UnixUserGroupInformation.saveToConf(conf2,
-            UnixUserGroupInformation.UGI_PROPERTY_NAME,
-            new UnixUserGroupInformation(username, new String[]{"supergroup"}));
-        FileSystem dfs2 = FileSystem.get(conf2);
+  /**
+   * This test makes the client does not renew its lease and also
+   * set the soft lease expiration period to be short 1s. Thus triggering
+   * soft lease expiration to happen immediately by having another client
+   * trying to create the same file.
+   *
+   * The test makes sure that the lease recovery completes.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSoftLeaseRecovery() throws Exception {
+    //create a file
+    // create a random file name
+    String filestr = "/foo" + AppendTestUtil.nextInt();
+    System.out.println("filestr=" + filestr);
+    Path filepath = new Path(filestr);
+    FSDataOutputStream stm = dfs.create(filepath, true,
+        BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+    assertTrue(dfs.dfs.exists(filestr));
+
+    // write random number of bytes into it.
+    int size = AppendTestUtil.nextInt(FILE_SIZE);
+    System.out.println("size=" + size);
+    stm.write(buffer, 0, size);
+
+    // hflush file
+    AppendTestUtil.LOG.info("hflush");
+    stm.hflush();
+    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+    dfs.dfs.leasechecker.interruptAndJoin();
+
+    // set the soft limit to be 1 second so that the
+    // namenode triggers lease recovery on next attempt to write-for-open.
+    cluster.setLeasePeriod(SHORT_LEASE_PERIOD, LONG_LEASE_PERIOD);
+
+    // try to re-open the file before closing the previous handle. This
+    // should fail but will trigger lease recovery.
+    {
+      Configuration conf2 = new HdfsConfiguration(conf);
+      String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
+      UnixUserGroupInformation.saveToConf(conf2,
+          UnixUserGroupInformation.UGI_PROPERTY_NAME,
+          new UnixUserGroupInformation(username, new String[]{"supergroup"}));
+      FileSystem dfs2 = FileSystem.get(conf2);
   
-        boolean done = false;
-        for(int i = 0; i < 10 && !done; i++) {
-          AppendTestUtil.LOG.info("i=" + i);
-          try {
-            dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE);
-            fail("Creation of an existing file should never succeed.");
-          } catch (IOException ioe) {
-            final String message = ioe.getMessage();
-            if (message.contains("file exists")) {
-              AppendTestUtil.LOG.info("done", ioe);
-              done = true;
-            }
-            else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName()))
{
-              AppendTestUtil.LOG.info("GOOD! got " + message);
-            }
-            else {
-              AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
-            }
+      boolean done = false;
+      for(int i = 0; i < 10 && !done; i++) {
+        AppendTestUtil.LOG.info("i=" + i);
+        try {
+          dfs2.create(filepath, false, BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+          fail("Creation of an existing file should never succeed.");
+        } catch (IOException ioe) {
+          final String message = ioe.getMessage();
+          if (message.contains("file exists")) {
+            AppendTestUtil.LOG.info("done", ioe);
+            done = true;
           }
-
-          if (!done) {
-            AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
-            try {Thread.sleep(5000);} catch (InterruptedException e) {}
+          else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName()))
{
+            AppendTestUtil.LOG.info("GOOD! got " + message);
+          }
+          else {
+            AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
           }
         }
-        assertTrue(done);
-      }
-
-      AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
-          + "Validating its contents now...");
 
-      // verify that file-size matches
-      long fileSize = dfs.getFileStatus(filepath).getLen();
-      assertTrue("File should be " + size + " bytes, but is actually " +
-                 " found to be " + fileSize + " bytes", fileSize == size);
-
-      // verify that there is enough data to read.
-      System.out.println("File size is good. Now validating sizes from datanodes...");
-      FSDataInputStream stmin = dfs.open(filepath);
-      stmin.readFully(0, actual, 0, size);
-      stmin.close();
-    }
-    finally {
-      try {
-        if(dfs != null) dfs.close();
-        if (cluster != null) {cluster.shutdown();}
-      } catch (Exception e) {
-        // ignore
+        if (!done) {
+          AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+          try {Thread.sleep(5000);} catch (InterruptedException e) {}
+        }
       }
+      assertTrue(done);
     }
+
+    AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
+        + "Validating its contents now...");
+
+    // verify that file-size matches
+    long fileSize = dfs.getFileStatus(filepath).getLen();
+    assertTrue("File should be " + size + " bytes, but is actually " +
+               " found to be " + fileSize + " bytes", fileSize == size);
+
+    // verify data
+    AppendTestUtil.LOG.info("File size is good. " +
+                     "Now validating data and sizes from datanodes...");
+    AppendTestUtil.checkFullFile(dfs, filepath, size, buffer, filestr);
   }
 }



Mime
View raw message