hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r908203 - in /hadoop/hdfs/branches/branch-0.21: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Date Tue, 09 Feb 2010 20:23:18 GMT
Author: stack
Date: Tue Feb  9 20:23:17 2010
New Revision: 908203

URL: http://svn.apache.org/viewvc?rev=908203&view=rev
Log:
HDFS-927 DFSInputStream retries too many times for new block locations

Modified:
    hadoop/hdfs/branches/branch-0.21/CHANGES.txt
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=908203&r1=908202&r2=908203&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Tue Feb  9 20:23:17 2010
@@ -546,6 +546,9 @@
               exclude specific datanodes when locating the next block.
               (Cosmin Lehene via Stack)
 
+    HDFS-927. DFSInputStream retries too many times for new block locations
+    (Todd Lipcon via Stack)
+
 Release 0.20.2 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=908203&r1=908202&r2=908203&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Feb
 9 20:23:17 2010
@@ -257,9 +257,7 @@
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
 		                       DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-    this.maxBlockAcquireFailures = 
-                          conf.getInt("dfs.client.max.block.acquire.failures",
-                                      MAX_BLOCK_ACQUIRE_FAILURES);
+    this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
 
@@ -334,6 +332,11 @@
     }
   }
 
+  static int getMaxBlockAcquireFailures(Configuration conf) {
+    return conf.getInt("dfs.client.max.block.acquire.failures",
+                       MAX_BLOCK_ACQUIRE_FAILURES);
+  }
+
   /**
    * Get server default values for a number of configuration params.
    */
@@ -1613,7 +1616,7 @@
    * DFSInputStream provides bytes from a named file.  It handles 
    * negotiation of the namenode and various datanodes as necessary.
    ****************************************************************/
-  private class DFSInputStream extends FSInputStream {
+  class DFSInputStream extends FSInputStream {
     private Socket s = null;
     private boolean closed = false;
 
@@ -1627,6 +1630,18 @@
     private Block currentBlock = null;
     private long pos = 0;
     private long blockEnd = -1;
+
+    /**
+     * This variable tracks the number of failures since the start of the
+     * most recent user-facing operation. That is to say, it should be reset
+     * whenever the user makes a call on this stream, and if at any point
+     * during the retry logic, the failure count exceeds a threshold,
+     * the errors will be thrown back to the operation.
+     *
+     * Specifically this counts the number of times the client has gone
+     * back to the namenode to get a new list of block locations, and is
+     * capped at maxBlockAcquireFailures
+     */
     private int failures = 0;
 
     /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
@@ -1887,7 +1902,6 @@
       //
       DatanodeInfo chosenNode = null;
       int refetchToken = 1; // only need to get a new access token once
-      failures = 0;
       
       while (true) {
         //
@@ -2038,6 +2052,7 @@
       if (closed) {
         throw new IOException("Stream closed");
       }
+      failures = 0;
       if (pos < getFileLength()) {
         int retries = 2;
         while (retries > 0) {
@@ -2118,7 +2133,6 @@
       //
       Socket dn = null;
       int refetchToken = 1; // only need to get a new access token once
-      failures = 0;
       
       while (true) {
         // cached block locations may have been updated by chooseDataNode()
@@ -2195,6 +2209,7 @@
       if (closed) {
         throw new IOException("Stream closed");
       }
+      failures = 0;
       long filelen = getFileLength();
       if ((position < 0) || (position >= filelen)) {
         return -1;

Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=908203&r1=908202&r2=908203&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
Tue Feb  9 20:23:17 2010
@@ -20,14 +20,18 @@
 
 import java.io.File;
 import java.io.RandomAccessFile;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 
-import junit.framework.TestCase;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 
 /**
  * A JUnit test for corrupted file handling.
@@ -56,18 +60,7 @@
  *     increase replication factor of file to 3. verify that the new 
  *     replica was created from the non-corrupted replica.
  */
-public class TestCrcCorruption extends TestCase {
-  
-  public TestCrcCorruption(String testName) {
-    super(testName);
-  }
-
-  protected void setUp() throws Exception {
-  }
-
-  protected void tearDown() throws Exception {
-  }
-  
+public class TestCrcCorruption {
   /** 
    * check if DFS can handle corrupted CRC blocks
    */
@@ -202,6 +195,7 @@
     }
   }
 
+  @Test
   public void testCrcCorruption() throws Exception {
     //
     // default parameters
@@ -222,4 +216,59 @@
     DFSTestUtil util2 = new DFSTestUtil("TestCrcCorruption", 40, 3, 400);
     thistest(conf2, util2);
   }
+
+
+  /**
+   * Make a single-DN cluster, corrupt a block, and make sure
+   * there's no infinite loop, but rather it eventually
+   * reports the exception to the client.
+   */
+  @Test(timeout=300000) // 5 min timeout
+  public void testEntirelyCorruptFileOneNode() throws Exception {
+    doTestEntirelyCorruptFile(1);
+  }
+
+  /**
+   * Same thing with multiple datanodes - in history, this has
+   * behaved differently than the above.
+   *
+   * This test usually completes in around 15 seconds - if it
+   * times out, this suggests that the client is retrying
+   * indefinitely.
+   */
+  @Test(timeout=300000) // 5 min timeout
+  public void testEntirelyCorruptFileThreeNodes() throws Exception {
+    doTestEntirelyCorruptFile(3);
+  }
+
+  private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception {
+    long fileSize = 4096;
+    Path file = new Path("/testFile");
+
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.replication", numDataNodes);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/);
+      DFSTestUtil.waitReplication(fs, file, (short)numDataNodes);
+
+      String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
+      cluster.corruptBlockOnDataNodes(block);
+
+      try {
+        IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,
+                          true);
+        fail("Didn't get exception");
+      } catch (IOException ioe) {
+        DFSClient.LOG.info("Got expected exception", ioe);
+      }
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=908203&r1=908202&r2=908203&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Tue Feb  9 20:23:17 2010
@@ -20,22 +20,29 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.*;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.AccessControlException;
 
 import junit.framework.TestCase;
 
+import static org.mockito.Mockito.*;
+import org.mockito.stubbing.Answer;
+import org.mockito.invocation.InvocationOnMock;
 
 /**
  * These tests make sure that DFSClient retries fetching data from DFS
@@ -251,5 +258,126 @@
            e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
     }
   }
+
+  /**
+   * This tests that DFSInputStream failures are counted for a given read
+   * operation, and not over the lifetime of the stream. It is a regression
+   * test for HDFS-127.
+   */
+  public void testFailuresArePerOperation() throws Exception
+  {
+    long fileSize = 4096;
+    Path file = new Path("/testFile");
+
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+    int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf);
+    assertTrue(maxBlockAcquires > 0);
+
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      NameNode preSpyNN = cluster.getNameNode();
+      NameNode spyNN = spy(preSpyNN);
+      DFSClient client = new DFSClient(null, spyNN, conf, null);
+
+      DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L /*seed*/);
+
+      // If the client will retry maxBlockAcquires times, then if we fail
+      // any more than that number of times, the operation should entirely
+      // fail.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires + 1))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      try {
+        IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf,
+                          true);
+        fail("Didn't get exception");
+      } catch (IOException ioe) {
+        DFSClient.LOG.info("Got expected exception", ioe);
+      }
+
+      // If we fail exactly that many times, then it should succeed.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf,
+                        true);
+
+      DFSClient.LOG.info("Starting test case for failure reset");
+
+      // Now the tricky case - if we fail a few times on one read, then succeed,
+      // then fail some more on another read, it shouldn't fail.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      DFSInputStream is = client.open(file.toString());
+      byte buf[] = new byte[10];
+      IOUtils.readFully(is, buf, 0, buf.length);
+
+      DFSClient.LOG.info("First read successful after some failures.");
+
+      // Further reads at this point will succeed since it has the good block locations.
+      // So, force the block locations on this stream to be refreshed from bad info.
+      // When reading again, it should start from a fresh failure count, since
+      // we're starting a new operation on the user level.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      is.openInfo();
+      // Seek to beginning forces a reopen of the BlockReader - otherwise it'll
+      // just keep reading on the existing stream and the fact that we've poisoned
+      // the block info won't do anything.
+      is.seek(0);
+      IOUtils.readFully(is, buf, 0, buf.length);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Mock Answer implementation of NN.getBlockLocations that will return
+   * a poisoned block list a certain number of times before returning
+   * a proper one.
+   */
+  private static class FailNTimesAnswer implements Answer<LocatedBlocks> {
+    private int failuresLeft;
+    private NameNode realNN;
+
+    public FailNTimesAnswer(NameNode realNN, int timesToFail) {
+      failuresLeft = timesToFail;
+      this.realNN = realNN;
+    }
+
+    public LocatedBlocks answer(InvocationOnMock invocation) throws IOException {
+      Object args[] = invocation.getArguments();
+      LocatedBlocks realAnswer = realNN.getBlockLocations(
+        (String)args[0],
+        (Long)args[1],
+        (Long)args[2]);
+
+      if (failuresLeft-- > 0) {
+        NameNode.LOG.info("FailNTimesAnswer injecting failure.");
+        return makeBadBlockList(realAnswer);
+      }
+      NameNode.LOG.info("FailNTimesAnswer no longer failing.");
+      return realAnswer;
+    }
+
+    private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) {
+      LocatedBlock goodLocatedBlock = goodBlockList.get(0);
+      LocatedBlock badLocatedBlock = new LocatedBlock(
+        goodLocatedBlock.getBlock(),
+        new DatanodeInfo[] {
+          new DatanodeInfo(new DatanodeID("255.255.255.255:234"))
+        },
+        goodLocatedBlock.getStartOffset(),
+        false);
+
+
+      List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>();
+      badBlocks.add(badLocatedBlock);
+      return new LocatedBlocks(goodBlockList.getFileLength(), false,
+                               badBlocks, null, true);
+    }
+  }
   
 }



Mime
View raw message