hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1167425 - in /hadoop/common/branches/branch-0.20-security: ./ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/protocol/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/datanode/
Date Sat, 10 Sep 2011 00:56:41 GMT
Author: suresh
Date: Sat Sep 10 00:56:41 2011
New Revision: 1167425

URL: http://svn.apache.org/viewvc?rev=1167425&view=rev
Log:
HDFS-1186. DNs should interrupt writers at start of recovery. Contributed by Todd Lipcon.


Added:
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Sep 10 00:56:41 2011
@@ -119,6 +119,9 @@ Release 0.20.205.0 - unreleased
     HDFS-1218. Blocks recovered on startup should be treated with lower 
     priority during block synchronization. (Todd Lipcon via suresh)
 
+    HDFS-1186. DNs should interrupt writers at start of recovery.
+    (Todd Lipcon via suresh)
+
   IMPROVEMENTS
 
     MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Sat Sep 10 00:56:41 2011
@@ -1660,8 +1660,8 @@ public class DataNode extends Configured
   }
   
   @Override
-  public BlockRecoveryInfo getBlockRecoveryInfo(Block block) throws IOException {
-    return data.getBlockRecoveryInfo(block.getBlockId());
+  public BlockRecoveryInfo startBlockRecovery(Block block) throws IOException {
+    return data.startBlockRecovery(block.getBlockId());
   }
 
   public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
@@ -1769,7 +1769,7 @@ public class DataNode extends Configured
         try {
           InterDatanodeProtocol datanode = dnRegistration.equals(id)?
               this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout);
-          BlockRecoveryInfo info = datanode.getBlockRecoveryInfo(block);
+          BlockRecoveryInfo info = datanode.startBlockRecovery(block);
           if (info == null) {
             LOG.info("No block metadata found for block " + block + " on datanode "
                 + id);

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Sat Sep 10 00:56:41 2011
@@ -1134,32 +1134,39 @@ public class FSDataset implements FSCons
         return;
       }
 
-      // interrupt and wait for all ongoing create threads
-      for(Thread t : threads) {
-        t.interrupt();
-      }
-      for(Thread t : threads) {
-        try {
-          t.join();
-        } catch (InterruptedException e) {
-          DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
-          break; // retry with new threadlist from the beginning
-        }
+      interruptAndJoinThreads(threads);
+    }
+  }
+
+  /**
+   * Try to interrupt all of the given threads, and join on them.
+   * If interrupted, returns false, indicating some threads may
+   * still be running.
+   */
+  private boolean interruptAndJoinThreads(List<Thread> threads) {
+    // interrupt and wait for all ongoing create threads
+    for(Thread t : threads) {
+      t.interrupt();
+    }
+    for(Thread t : threads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
+        return false;
       }
     }
+    return true;
   }
 
+  
   /**
-   * Try to update an old block to a new block.
-   * If there are ongoing create threads running for the old block,
-   * the threads will be returned without updating the block. 
-   * 
-   * @return ongoing create threads if there is any. Otherwise, return null.
+   * Return a list of active writer threads for the given block.
+   * @return null if there are no such threads or the file is
+   * not being created
    */
-  private synchronized List<Thread> tryUpdateBlock(
-      Block oldblock, Block newblock) throws IOException {
-    //check ongoing create threads
-    final ActiveFile activefile = ongoingCreates.get(oldblock);
+  private synchronized ArrayList<Thread> getActiveThreads(Block block) {
+    final ActiveFile activefile = ongoingCreates.get(block);
     if (activefile != null && !activefile.threads.isEmpty()) {
       //remove dead threads
       for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
@@ -1168,13 +1175,30 @@ public class FSDataset implements FSCons
           i.remove();
         }
       }
-
+  
       //return living threads
       if (!activefile.threads.isEmpty()) {
         return new ArrayList<Thread>(activefile.threads);
       }
     }
-
+    return null;
+  }
+  
+  /**
+   * Try to update an old block to a new block.
+   * If there are ongoing create threads running for the old block,
+   * the threads will be returned without updating the block. 
+   * 
+   * @return ongoing create threads if there is any. Otherwise, return null.
+   */
+  private synchronized List<Thread> tryUpdateBlock(
+      Block oldblock, Block newblock) throws IOException {
+    //check ongoing create threads
+    ArrayList<Thread> activeThreads = getActiveThreads(oldblock);
+    if (activeThreads != null) {
+      return activeThreads;
+    }
+    
     //No ongoing create threads is alive.  Update block.
     File blockFile = findBlockFile(oldblock.getBlockId());
     if (blockFile == null) {
@@ -1945,30 +1969,42 @@ public class FSDataset implements FSCons
   }
 
   @Override
-  public synchronized  BlockRecoveryInfo getBlockRecoveryInfo(long blockId) 
-      throws IOException {
+  public BlockRecoveryInfo startBlockRecovery(long blockId) 
+      throws IOException {    
     Block stored = getStoredBlock(blockId);
 
     if (stored == null) {
       return null;
     }
     
-    ActiveFile activeFile = ongoingCreates.get(stored);
-    boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;
-    
+    // It's important that this loop not be synchronized - otherwise
+    // this will deadlock against the thread it's joining against!
+    while (true) {
+      DataNode.LOG.debug(
+          "Interrupting active writer threads for block " + stored);
+      List<Thread> activeThreads = getActiveThreads(stored);
+      if (activeThreads == null) break;
+      if (interruptAndJoinThreads(activeThreads))
+        break;
+    }
     
-    BlockRecoveryInfo info = new BlockRecoveryInfo(
-        stored, isRecovery);
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
-                " length " + stored.getNumBytes() +
-                " genstamp " + stored.getGenerationStamp());
+    synchronized (this) {
+      ActiveFile activeFile = ongoingCreates.get(stored);
+      boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;
+      
+      
+      BlockRecoveryInfo info = new BlockRecoveryInfo(
+          stored, isRecovery);
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
+                  " length " + stored.getNumBytes() +
+                  " genstamp " + stored.getGenerationStamp());
+      }
+  
+      // paranoia! verify that the contents of the stored block
+      // matches the block file on disk.
+      validateBlockMetadata(stored);
+      return info;
     }
-
-    // paranoia! verify that the contents of the stored block
-    // matches the block file on disk.
-    validateBlockMetadata(stored);
-
-    return info;
   }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
Sat Sep 10 00:56:41 2011
@@ -308,5 +308,5 @@ public interface FSDatasetInterface exte
    */
   public boolean hasEnoughResource();
 
-  public BlockRecoveryInfo getBlockRecoveryInfo(long blockId) throws IOException;
+  public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
Sat Sep 10 00:56:41 2011
@@ -46,10 +46,12 @@ public interface InterDatanodeProtocol e
   BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
 
   /**
+   * Begin recovery on a block - this interrupts writers and returns the
+   * necessary metadata for recovery to begin.
    * @return the BlockRecoveryInfo for a block
    * @return null if the block is not found
    */
-  BlockRecoveryInfo getBlockRecoveryInfo(Block block) throws IOException;
+  BlockRecoveryInfo startBlockRecovery(Block block) throws IOException;
   
   /**
    * Update the block to the new generation stamp and length.  

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
Sat Sep 10 00:56:41 2011
@@ -21,15 +21,20 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /** Utilities for append-related tests */ 
@@ -114,4 +119,85 @@ class AppendTestUtil {
       throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
     }
   }
+  
+  static class WriterThread extends Thread {
+    private final FSDataOutputStream stm;
+    private final AtomicReference<Throwable> thrown;
+    private final int numWrites;
+    private final CountDownLatch countdown;
+    private final byte[] toWrite;
+    private AtomicInteger numWritten = new AtomicInteger();
+    
+    public WriterThread(FSDataOutputStream stm,
+        byte[] toWrite,
+        AtomicReference<Throwable> thrown,
+        CountDownLatch countdown, int numWrites) {
+      this.toWrite = toWrite;
+      this.stm = stm;
+      this.thrown = thrown;
+      this.numWrites = numWrites;
+      this.countdown = countdown;
+    }
+  
+    public void run() {
+      try {
+        countdown.await();
+        for (int i = 0; i < numWrites && thrown.get() == null; i++) {
+          doAWrite();
+          numWritten.getAndIncrement();
+        }
+      } catch (Throwable t) {
+        thrown.compareAndSet(null, t);
+      }
+    }
+  
+    private void doAWrite() throws IOException {
+      stm.write(toWrite);
+      stm.sync();
+    }
+    
+    public int getNumWritten() {
+      return numWritten.get();
+    }
+  }
+
+  public static void loseLeases(FileSystem whichfs) throws Exception {
+    LOG.info("leasechecker.interruptAndJoin()");
+    // lose the lease on the client
+    DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
+    dfs.dfs.leasechecker.interruptAndJoin();
+  }
+  
+  public static void recoverFile(MiniDFSCluster cluster, FileSystem fs,
+      Path file1) throws IOException {
+    
+    // set the soft limit to be 1 second so that the
+    // namenode triggers lease recovery upon append request
+    cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
+
+    // Trying recovery
+    int tries = 60;
+    boolean recovered = false;
+    FSDataOutputStream out = null;
+    while (!recovered && tries-- > 0) {
+      try {
+        out = fs.append(file1);
+        LOG.info("Successfully opened for appends");
+        recovered = true;
+      } catch (IOException e) {
+        LOG.info("Failed open for append, waiting on lease recovery");
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          // ignore it and try again
+        }
+      }
+    }
+    if (out != null) {
+      out.close();
+    }
+    if (!recovered) {
+      throw new RuntimeException("Recovery failed");
+    }    
+  }  
 }

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
Sat Sep 10 00:56:41 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSImageAdapter;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import static org.apache.hadoop.hdfs.AppendTestUtil.loseLeases;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -146,13 +147,6 @@ public class TestFileAppend4 extends Tes
     assertTrue(file1 + " should be replicated to " + rep + " datanodes, not " +
                actualRepl + ".", actualRepl == rep);
   }
-  
-  private void loseLeases(FileSystem whichfs) throws Exception {
-    LOG.info("leasechecker.interruptAndJoin()");
-    // lose the lease on the client
-    DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
-    dfs.dfs.leasechecker.interruptAndJoin();
-  }
 
   /*
    * Recover file.
@@ -166,42 +160,7 @@ public class TestFileAppend4 extends Tes
    */
   private void recoverFile(final FileSystem fs) throws Exception {
     LOG.info("Recovering File Lease");
-
-    // set the soft limit to be 1 second so that the
-    // namenode triggers lease recovery upon append request
-    cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
-
-    // Trying recovery
-    int tries = 60;
-    boolean recovered = false;
-    FSDataOutputStream out = null;
-    while (!recovered && tries-- > 0) {
-      try {
-        out = fs.append(file1);
-        LOG.info("Successfully opened for appends");
-        recovered = true;
-      } catch (IOException e) {
-        LOG.info("Failed open for append, waiting on lease recovery");
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ex) {
-          // ignore it and try again
-        }
-      }
-    }
-    if (out != null) {
-      try {
-        out.close();
-        LOG.info("Successfully obtained lease");
-      } catch (IOException e) {
-        LOG.info("Unable to close file after opening for appends. " + e);
-        recovered = false;
-      }
-//      out.close();
-    }
-    if (!recovered) {
-      fail((tries > 0) ? "Recovery failed" : "Recovery should take < 1 min");
-    }
+    AppendTestUtil.recoverFile(cluster, fs, file1);
     LOG.info("Past out lease recovery");
   }
   

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
Sat Sep 10 00:56:41 2011
@@ -67,39 +67,6 @@ public class TestMultiThreadedSync {
     toWrite = AppendTestUtil.randomBytes(seed, size);
   }
 
-  private class WriterThread extends Thread {
-    private final FSDataOutputStream stm;
-    private final AtomicReference<Throwable> thrown;
-    private final int numWrites;
-    private final CountDownLatch countdown;
-
-    public WriterThread(FSDataOutputStream stm,
-      AtomicReference<Throwable> thrown,
-      CountDownLatch countdown, int numWrites) {
-      this.stm = stm;
-      this.thrown = thrown;
-      this.numWrites = numWrites;
-      this.countdown = countdown;
-    }
-
-    public void run() {
-      try {
-        countdown.await();
-        for (int i = 0; i < numWrites && thrown.get() == null; i++) {
-          doAWrite();
-        }
-      } catch (Throwable t) {
-        thrown.compareAndSet(null, t);
-      }
-    }
-
-    private void doAWrite() throws IOException {
-      stm.write(toWrite);
-      stm.sync();
-    }
-  }
-
-
   @Test
   public void testMultipleSyncers() throws Exception {
     Configuration conf = new Configuration();
@@ -206,7 +173,7 @@ public class TestMultiThreadedSync {
     ArrayList<Thread> threads = new ArrayList<Thread>();
     AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
     for (int i = 0; i < numThreads; i++) {
-      Thread t = new WriterThread(stm, thrown, countdown, numWrites);
+      Thread t = new AppendTestUtil.WriterThread(stm, toWrite, thrown, countdown, numWrites);
       threads.add(t);
       t.start();
     }

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java?rev=1167425&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java
(added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java
Sat Sep 10 00:56:41 2011
@@ -0,0 +1,77 @@
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.AppendTestUtil.WriterThread;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSyncingWriterInterrupted {
+  static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.setBoolean("dfs.support.append", true);
+    conf.setInt("dfs.client.block.recovery.retries", 1);
+  }
+  
+  @Test(timeout=90000)
+  public void testWriterInterrupted() throws Exception {
+    short repl = 3;
+    int numWrites = 20000;
+    
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, repl, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());    
+    
+    Path path = new Path("/testWriterInterrupted");
+    FSDataOutputStream stm = fs1.create(path);
+    byte[] toWrite = AppendTestUtil.randomBytes(0, 5);
+    
+    CountDownLatch countdown = new CountDownLatch(1);
+    AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+    WriterThread writerThread = new AppendTestUtil.WriterThread(
+        stm, toWrite, thrown, countdown, numWrites);
+    writerThread.start();
+    countdown.countDown();
+    while (writerThread.getNumWritten() == 0 &&
+        thrown.get() == null &&
+        writerThread.isAlive()) {
+      System.err.println("Waiting for writer to start");
+      Thread.sleep(10);
+    }
+    assertTrue(writerThread.isAlive());    
+    if (thrown.get() != null) {
+      throw new RuntimeException(thrown.get());
+    }
+    
+    AppendTestUtil.loseLeases(fs1);    
+    AppendTestUtil.recoverFile(cluster, fs2, path);
+
+    while (thrown.get() == null) {
+      LOG.info("Waiting for writer thread to get expected exception");
+      Thread.sleep(1000);
+    }
+    assertNotNull(thrown.get());
+    
+    // Check that we can see all of the synced edits
+    int expectedEdits = writerThread.getNumWritten();
+    int gotEdits = (int)(fs2.getFileStatus(path).getLen() / toWrite.length);
+    assertTrue("Expected at least " + expectedEdits +
+        " edits, got " + gotEdits, gotEdits >= expectedEdits);
+    
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Sat Sep 10 00:56:41 2011
@@ -690,7 +690,7 @@ public class SimulatedFSDataset  impleme
   }
 
   @Override
-  public BlockRecoveryInfo getBlockRecoveryInfo(long blockId)
+  public BlockRecoveryInfo startBlockRecovery(long blockId)
       throws IOException {
     Block stored = getStoredBlock(blockId);
     return new BlockRecoveryInfo(stored, false);



Mime
View raw message