hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r926469 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/unit/org/apache/hadoop/hdfs/server/datanode/
Date Tue, 23 Mar 2010 06:00:12 GMT
Author: hairong
Date: Tue Mar 23 06:00:11 2010
New Revision: 926469

URL: http://svn.apache.org/viewvc?rev=926469&view=rev
Log:
HDFS-520. Create new tests for block recovery. Contributed by Hairong.

Added:
    hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/
    hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=926469&r1=926468&r2=926469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Mar 23 06:00:11 2010
@@ -544,6 +544,8 @@ Release 0.21.0 - Unreleased
     HDFS-127. Reset failure count in DFSClient for each block acquiring
     operation.  (Igor Bolotin via szetszwo)
 
+    HDFS-520. Create new tests for block recovery. (hairong)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=926469&r1=926468&r2=926469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Mar
23 06:00:11 2010
@@ -228,6 +228,19 @@ public class DataNode extends Configured
    */
   DataNode(final Configuration conf, 
            final AbstractList<File> dataDirs) throws IOException {
+    this(conf, dataDirs, (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+                       DatanodeProtocol.versionID,
+                       NameNode.getAddress(conf), 
+                       conf));
+  }
+  
+  /**
+   * Create the DataNode given a configuration, an array of dataDirs,
+   * and a namenode proxy
+   */
+  DataNode(final Configuration conf, 
+           final AbstractList<File> dataDirs,
+           final DatanodeProtocol namenode) throws IOException {
     super(conf);
 
     UserGroupInformation.setConfiguration(conf);
@@ -238,7 +251,7 @@ public class DataNode extends Configured
     DataNode.setDataNode(this);
     
     try {
-      startDataNode(conf, dataDirs);
+      startDataNode(conf, dataDirs, namenode);
     } catch (IOException ie) {
       shutdown();
      throw ie;
@@ -256,7 +269,8 @@ public class DataNode extends Configured
    * @throws IOException
    */
   void startDataNode(Configuration conf, 
-                     AbstractList<File> dataDirs
+                     AbstractList<File> dataDirs,
+                     DatanodeProtocol namenode
                      ) throws IOException {
     // use configured nameserver & interface to get local hostname
     if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
@@ -287,11 +301,8 @@ public class DataNode extends Configured
     this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
 
     // connect to name node
-    this.namenode = (DatanodeProtocol) 
-      RPC.waitForProxy(DatanodeProtocol.class,
-                       DatanodeProtocol.versionID,
-                       nameNodeAddr, 
-                       conf);
+    this.namenode = namenode;
+    
     // get version and id info from the name-node
     NamespaceInfo nsInfo = handshake();
     StartupOption startOpt = getStartupOption(conf);
@@ -1589,7 +1600,7 @@ public class DataNode extends Configured
   }
 
   /** A convenient class used in block recovery */
-  private static class BlockRecord { 
+  static class BlockRecord { 
     final DatanodeID id;
     final InterDatanodeProtocol datanode;
     final ReplicaRecoveryInfo rInfo;
@@ -1650,7 +1661,7 @@ public class DataNode extends Configured
   }
 
   /** Block synchronization */
-  private void syncBlock(RecoveringBlock rBlock,
+  void syncBlock(RecoveringBlock rBlock,
                          List<BlockRecord> syncList) throws IOException {
     Block block = rBlock.getBlock();
     long recoveryId = rBlock.getNewGenerationStamp();

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=926469&r1=926468&r2=926469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Tue Mar
23 06:00:11 2010
@@ -17,146 +17,225 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
-public class TestLeaseRecovery2 extends junit.framework.TestCase {
+public class TestLeaseRecovery2 {
   {
     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
   }
 
-  static final long BLOCK_SIZE = 1024;
-  static final int FILE_SIZE = 1024*16;
+  static final private long BLOCK_SIZE = 1024;
+  static final private int FILE_SIZE = (int)BLOCK_SIZE*2;
   static final short REPLICATION_NUM = (short)3;
   static byte[] buffer = new byte[FILE_SIZE];
   
   static private String fakeUsername = "fakeUser1";
   static private String fakeGroup = "supergroup";
 
-  public void testBlockSynchronization() throws Exception {
-    final long softLease = 1000;
-    final long hardLease = 60 * 60 *1000;
-    final short repl = 3;
-    final Configuration conf = new HdfsConfiguration();
-    final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+  static private MiniDFSCluster cluster;
+  static private DistributedFileSystem dfs;
+  final static private Configuration conf = new HdfsConfiguration();
+  final static private int BUF_SIZE = conf.getInt("io.file.buffer.size", 4096);
+  
+  final static private long SHORT_LEASE_PERIOD = 1000L;
+  final static private long LONG_LEASE_PERIOD = 60*60*SHORT_LEASE_PERIOD;
+  
+  /** start a dfs cluster
+   * 
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void startUp() throws IOException {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setInt("dfs.heartbeat.interval", 1);
-  //  conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 16);
 
-    // create fake mapping user to group and set it to the conf
-    // NOTE. this must be done at the beginning, before first call to mapping
-    // functions
+    cluster = new MiniDFSCluster(conf, 5, true, null);
+    cluster.waitActive();
+    dfs = (DistributedFileSystem)cluster.getFileSystem();
+  }
+  
+  /**
+   * stop the cluster
+   * @throws IOException
+   */
+  @AfterClass
+  public static void tearDown() throws IOException {
+    IOUtils.closeStream(dfs);
+    if (cluster != null) {cluster.shutdown();}
+  }
+  
+  /**
+   * This test makes the client does not renew its lease and also
+   * set the hard lease expiration period to be short 1s. Thus triggering
+   * lease expiration to happen while the client is still alive.
+   * 
+   * The test makes sure that the lease recovery completes and the client
+   * fails if it continues to write to the file.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testHardLeaseRecovery() throws Exception {
+    //create a file
+    String filestr = "/hardLeaseRecovery";
+    AppendTestUtil.LOG.info("filestr=" + filestr);
+    Path filepath = new Path(filestr);
+    FSDataOutputStream stm = dfs.create(filepath, true,
+        BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+    assertTrue(dfs.dfs.exists(filestr));
+
+    // write bytes into the file.
+    int size = AppendTestUtil.nextInt(FILE_SIZE);
+    AppendTestUtil.LOG.info("size=" + size);
+    stm.write(buffer, 0, size);
+
+    // hflush file
+    AppendTestUtil.LOG.info("hflush");
+    stm.hflush();
+    
+    // kill the lease renewal thread
+    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+    dfs.dfs.leasechecker.interruptAndJoin();
+
+    // set the hard limit to be 1 second 
+    cluster.setLeasePeriod(LONG_LEASE_PERIOD, SHORT_LEASE_PERIOD);
+    
+    // wait for lease recovery to complete
+    LocatedBlocks locatedBlocks;
+    do {
+      Thread.sleep(SHORT_LEASE_PERIOD);
+      locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode,
+        filestr, 0L, size);
+    } while (locatedBlocks.isUnderConstruction());
+    assertEquals(size, locatedBlocks.getFileLength());
+
+    // make sure that the writer thread gets killed
+    try {
+      stm.write('b');
+      stm.close();
+      fail("Writer thread should have been killed");
+    } catch (IOException e) {
+      e.printStackTrace();
+    }      
+
+    // verify data
+    AppendTestUtil.LOG.info(
+        "File size is good. Now validating sizes from datanodes...");
+    AppendTestUtil.checkFullFile(dfs, filepath, size, buffer, filestr);
+  }
+  
+  /**
+   * This test makes the client does not renew its lease and also
+   * set the soft lease expiration period to be short 1s. Thus triggering
+   * soft lease expiration to happen immediately by having another client
+   * trying to create the same file.
+   * 
+   * The test makes sure that the lease recovery completes.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testSoftLeaseRecovery() throws Exception {
     Map<String, String []> u2g_map = new HashMap<String, String []>(1);
     u2g_map.put(fakeUsername, new String[] {fakeGroup});
     DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
 
-    MiniDFSCluster cluster = null;
-    DistributedFileSystem dfs = null;
-    byte[] actual = new byte[FILE_SIZE];
-
-    try {
-      cluster = new MiniDFSCluster(conf, 5, true, null);
-      cluster.waitActive();
-
-      //create a file
-      dfs = (DistributedFileSystem)cluster.getFileSystem();
-      // create a random file name
-      String filestr = "/foo" + AppendTestUtil.nextInt();
-      System.out.println("filestr=" + filestr);
-      Path filepath = new Path(filestr);
-      FSDataOutputStream stm = dfs.create(filepath, true,
-          bufferSize, repl, BLOCK_SIZE);
-      assertTrue(dfs.dfs.exists(filestr));
-
-      // write random number of bytes into it.
-      int size = AppendTestUtil.nextInt(FILE_SIZE);
-      System.out.println("size=" + size);
-      stm.write(buffer, 0, size);
-
-      // hflush file
-      AppendTestUtil.LOG.info("hflush");
-      stm.hflush();
-      AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
-      dfs.dfs.leasechecker.interruptAndJoin();
-
-      // set the soft limit to be 1 second so that the
-      // namenode triggers lease recovery on next attempt to write-for-open.
-      cluster.setLeasePeriod(softLease, hardLease);
-
-      // try to re-open the file before closing the previous handle. This
-      // should fail but will trigger lease recovery.
-      {
-        UserGroupInformation ugi = 
-          UserGroupInformation.createUserForTesting(fakeUsername, 
-                                                    new String [] { fakeGroup});
-        
-        FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
-  
-        boolean done = false;
-        for(int i = 0; i < 10 && !done; i++) {
-          AppendTestUtil.LOG.info("i=" + i);
-          try {
-            dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE);
-            fail("Creation of an existing file should never succeed.");
-          } catch (IOException ioe) {
-            final String message = ioe.getMessage();
-            if (message.contains("file exists")) {
-              AppendTestUtil.LOG.info("done", ioe);
-              done = true;
-            }
-            else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName()))
{
-              AppendTestUtil.LOG.info("GOOD! got " + message);
-            }
-            else {
-              AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
-            }
+    //create a file
+    // create a random file name
+    String filestr = "/foo" + AppendTestUtil.nextInt();
+    AppendTestUtil.LOG.info("filestr=" + filestr);
+    Path filepath = new Path(filestr);
+    FSDataOutputStream stm = dfs.create(filepath, true,
+        BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+    assertTrue(dfs.dfs.exists(filestr));
+
+    // write random number of bytes into it.
+    int size = AppendTestUtil.nextInt(FILE_SIZE);
+    AppendTestUtil.LOG.info("size=" + size);
+    stm.write(buffer, 0, size);
+
+    // hflush file
+    AppendTestUtil.LOG.info("hflush");
+    stm.hflush();
+    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+    dfs.dfs.leasechecker.interruptAndJoin();
+
+    // set the soft limit to be 1 second so that the
+    // namenode triggers lease recovery on next attempt to write-for-open.
+    cluster.setLeasePeriod(SHORT_LEASE_PERIOD, LONG_LEASE_PERIOD);
+
+    // try to re-open the file before closing the previous handle. This
+    // should fail but will trigger lease recovery.
+    {
+      UserGroupInformation ugi = 
+        UserGroupInformation.createUserForTesting(fakeUsername, 
+            new String [] { fakeGroup});
+
+      FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
+
+      boolean done = false;
+      for(int i = 0; i < 10 && !done; i++) {
+        AppendTestUtil.LOG.info("i=" + i);
+        try {
+          dfs2.create(filepath, false, BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+          fail("Creation of an existing file should never succeed.");
+        } catch (IOException ioe) {
+          final String message = ioe.getMessage();
+          if (message.contains("file exists")) {
+            AppendTestUtil.LOG.info("done", ioe);
+            done = true;
           }
-
-          if (!done) {
-            AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
-            try {Thread.sleep(5000);} catch (InterruptedException e) {}
+          else if (message.contains(
+              AlreadyBeingCreatedException.class.getSimpleName())) {
+            AppendTestUtil.LOG.info("GOOD! got " + message);
+          }
+          else {
+            AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
           }
         }
-        assertTrue(done);
-      }
 
-      AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
-          + "Validating its contents now...");
-
-      // verify that file-size matches
-      long fileSize = dfs.getFileStatus(filepath).getLen();
-      assertTrue("File should be " + size + " bytes, but is actually " +
-                 " found to be " + fileSize + " bytes", fileSize == size);
-
-      // verify that there is enough data to read.
-      System.out.println("File size is good. Now validating sizes from datanodes...");
-      FSDataInputStream stmin = dfs.open(filepath);
-      stmin.readFully(0, actual, 0, size);
-      stmin.close();
-    }
-    finally {
-      try {
-        if(dfs != null) dfs.close();
-        if (cluster != null) {cluster.shutdown();}
-      } catch (Exception e) {
-        // ignore
+        if (!done) {
+          AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+          try {Thread.sleep(5000);} catch (InterruptedException e) {}
+        }
       }
+      assertTrue(done);
     }
+
+    AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
+        + "Validating its contents now...");
+
+    // verify that file-size matches
+    long fileSize = dfs.getFileStatus(filepath).getLen();
+    assertTrue("File should be " + size + " bytes, but is actually " +
+        " found to be " + fileSize + " bytes", fileSize == size);
+
+    // verify data
+    AppendTestUtil.LOG.info("File size is good. " +
+                     "Now validating data and sizes from datanodes...");
+    AppendTestUtil.checkFullFile(dfs, filepath, size, buffer, filestr);
   }
 }

Added: hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=926469&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
(added)
+++ hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
Tue Mar 23 06:00:11 2010
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * This tests if sync all replicas in block recovery works correctly
+ */
+public class TestBlockRecovery {
+  private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class);
+  private static final String DATA_DIR =
+    MiniDFSCluster.getBaseDirectory() + "data";
+  private DataNode dn;
+  private Configuration conf;
+  private final static long RECOVERY_ID = 3000L;
+  private final static long BLOCK_ID = 1000L;
+  private final static long GEN_STAMP = 2000L;
+  private final static long BLOCK_LEN = 3000L;
+  private final static long REPLICA_LEN1 = 6000L;
+  private final static long REPLICA_LEN2 = 5000L;
+  private final static Block block = new Block(BLOCK_ID, BLOCK_LEN, GEN_STAMP);
+
+  static {
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  /**
+   * Starts an instance of DataNode
+   * @throws IOException
+   */
+  @Before
+  public void startUp() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:5020");
+    ArrayList<File> dirs = new ArrayList<File>();
+    File dataDir = new File(DATA_DIR);
+    FileUtil.fullyDelete(dataDir);
+    dataDir.mkdirs();
+    dirs.add(dataDir);
+    DatanodeProtocol namenode = mock(DatanodeProtocol.class);
+    when(namenode.versionRequest()).thenReturn(new NamespaceInfo(1, 1L, 1));
+    when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(), 
+        anyLong(), anyLong(), anyInt(), anyInt())).thenReturn(
+            new DatanodeCommand[0]);
+    dn = new DataNode(conf, dirs, namenode);
+  }
+
+  /**
+   * Cleans the resources and closes the instance of datanode
+   * @throws IOException if an error occurred
+   */
+  @After
+  public void tearDown() throws IOException {
+    if (dn != null) {
+      try {
+        dn.shutdown();
+      } catch(Exception e) {
+        LOG.error("Cannot close: ", e);
+      } finally {
+        File dir = new File(DATA_DIR);
+        if (dir.exists())
+          Assert.assertTrue(
+              "Cannot delete data-node dirs", FileUtil.fullyDelete(dir));
+      }
+    }
+  }
+
+  /** Sync two replicas */
+  private void testSyncReplicas(ReplicaRecoveryInfo replica1, 
+      ReplicaRecoveryInfo replica2,
+      InterDatanodeProtocol dn1,
+      InterDatanodeProtocol dn2) throws IOException {
+    
+    DatanodeInfo[] locs = new DatanodeInfo[]{
+        mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
+    RecoveringBlock rBlock = new RecoveringBlock(block, 
+        locs, RECOVERY_ID);
+    ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
+    BlockRecord record1 = new BlockRecord(
+        new DatanodeID("xx", "yy", 44, 55), dn1, replica1);
+    BlockRecord record2 = new BlockRecord(
+        new DatanodeID("aa", "bb", 11, 22), dn2, replica2);
+    syncList.add(record1);
+    syncList.add(record2);
+    dn.syncBlock(rBlock, syncList);
+  }
+  
+  /**
+   * BlockRecovery_02.8.
+   * Two replicas are in Finalized state
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void testFinalizedReplicas () throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED);
+
+    InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+    InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+    testSyncReplicas(replica1, replica2, dn1, dn2);
+    verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+    verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);    
+
+    // two finalized replicas have different length
+    replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+    replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED);
+
+    try {
+      testSyncReplicas(replica1, replica2, dn1, dn2);
+      Assert.fail("Two finalized replicas should not have different lengthes!");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          "Inconsistent size of finalized replicas. "));
+    }
+  }
+  
+  /**
+   * BlockRecovery_02.9.
+   * One replica is Finalized and another is RBW. 
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void testFinalizedRbwReplicas() throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    
+    // rbw and finalized replicas have the same length
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW);
+
+    InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+    InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+    testSyncReplicas(replica1, replica2, dn1, dn2);
+    verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+    verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+    
+    // rbw replica has a different length from the finalized one
+    replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+    replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
+
+    dn1 = mock(InterDatanodeProtocol.class);
+    dn2 = mock(InterDatanodeProtocol.class);
+
+    testSyncReplicas(replica1, replica2, dn1, dn2);
+    verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+    verify(dn2, never()).updateReplicaUnderRecovery(
+        block, RECOVERY_ID, REPLICA_LEN1);
+  }
+  
+  /**
+   * BlockRecovery_02.10.
+   * One replica is Finalized and another is RWR. 
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void testFinalizedRwrReplicas() throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    
+    // rbw and finalized replicas have the same length
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
+
+    InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+    InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+    testSyncReplicas(replica1, replica2, dn1, dn2);
+    verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+    verify(dn2, never()).updateReplicaUnderRecovery(
+        block, RECOVERY_ID, REPLICA_LEN1);
+    
+    // rbw replica has a different length from the finalized one
+    replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+    replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
+
+    dn1 = mock(InterDatanodeProtocol.class);
+    dn2 = mock(InterDatanodeProtocol.class);
+
+    testSyncReplicas(replica1, replica2, dn1, dn2);
+    verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+    verify(dn2, never()).updateReplicaUnderRecovery(
+        block, RECOVERY_ID, REPLICA_LEN1);
+  }
+  
+  /**
+   * BlockRecovery_02.11.
+   * Two replicas are RBW.
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void testRBWReplicas() throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
+
+    InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+    InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+    testSyncReplicas(replica1, replica2, dn1, dn2);
+    long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
+    verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
+    verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);    
+  }
+  
+  /**
+   * BlockRecovery_02.12.
+   * One replica is RBW and another is RWR. 
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void testRBW_RWRReplicas() throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
+
+    InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+    InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+    testSyncReplicas(replica1, replica2, dn1, dn2);
+    verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+    verify(dn2, never()).updateReplicaUnderRecovery(
+        block, RECOVERY_ID, REPLICA_LEN1);    
+  }
+  
+  /**
+   * BlockRecovery_02.13. 
+   * Two replicas are RWR.
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void testRWRReplicas() throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR);
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+        REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR);
+
+    InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+    InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+    testSyncReplicas(replica1, replica2, dn1, dn2);
+    
+    long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
+    verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
+    verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);    
+  }  
+}



Mime
View raw message