hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r805158 - in /hadoop/common/branches/branch-0.20: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Date Mon, 17 Aug 2009 21:45:03 GMT
Author: szetszwo
Date: Mon Aug 17 21:45:02 2009
New Revision: 805158

URL: http://svn.apache.org/viewvc?rev=805158&view=rev
Log:
HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.  Contributed by Bill
Zeller

Modified:
    hadoop/common/branches/branch-0.20/CHANGES.txt
    hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=805158&r1=805157&r2=805158&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Mon Aug 17 21:45:02 2009
@@ -209,6 +209,9 @@
     MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact
     the JobTracker lock hierarchy wasn't maintained in some JobInProgress
     method calls. (Amar Kamat via ddas)
+
+    HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+    (Bill Zeller via szetszwo)
  
 Release 0.20.0 - 2009-04-15
 

Modified: hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=805158&r1=805157&r2=805158&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Mon
Aug 17 21:45:02 2009
@@ -67,8 +67,8 @@
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  public final ClientProtocol namenode;
-  final private ClientProtocol rpcNamenode;
+  public ClientProtocol namenode;
+  private ClientProtocol rpcNamenode;
   final UnixUserGroupInformation ugi;
   volatile boolean clientRunning = true;
   Random r = new Random();
@@ -155,6 +155,31 @@
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
                    FileSystem.Statistics stats)
     throws IOException {
+    this(conf, stats);
+    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+    this.namenode = createNamenode(this.rpcNamenode);
+  }
+
+  /** 
+   * Create a new DFSClient connected to the given namenode
+   * and rpcNamenode objects.
+   * 
+   * This constructor was written to allow easy testing of the DFSClient class.
+   * End users will most likely want to use one of the other constructors.
+   */
+  public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
+                   Configuration conf, FileSystem.Statistics stats)
+    throws IOException {
+      this(conf, stats);
+      this.namenode = namenode;
+      this.rpcNamenode = rpcNamenode;
+  }
+
+  
+  private DFSClient(Configuration conf, FileSystem.Statistics stats)
+    throws IOException {      
+      
+      
     this.conf = conf;
     this.stats = stats;
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
@@ -174,9 +199,6 @@
       throw (IOException)(new IOException().initCause(e));
     }
 
-    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
-    this.namenode = createNamenode(rpcNamenode);
-
     String taskId = conf.get("mapred.task.id");
     if (taskId != null) {
       this.clientName = "DFSClient_" + taskId; 
@@ -2870,7 +2892,7 @@
   
     private LocatedBlock locateFollowingBlock(long start
                                               ) throws IOException {     
-      int retries = 5;
+      int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
       long sleeptime = 400;
       while (true) {
         long localstart = System.currentTimeMillis();
@@ -2887,25 +2909,30 @@
               throw ue; // no need to retry these exceptions
             }
             
-            if (--retries == 0 && 
-                !NotReplicatedYetException.class.getName().
+            if (NotReplicatedYetException.class.getName().
                 equals(e.getClassName())) {
-              throw e;
+
+                if (retries == 0) { 
+                  throw e;
+                } else {
+                  --retries;
+                  LOG.info(StringUtils.stringifyException(e));
+                  if (System.currentTimeMillis() - localstart > 5000) {
+                    LOG.info("Waiting for replication for "
+                        + (System.currentTimeMillis() - localstart) / 1000
+                        + " seconds");
+                  }
+                  try {
+                    LOG.warn("NotReplicatedYetException sleeping " + src
+                        + " retries left " + retries);
+                    Thread.sleep(sleeptime);
+                    sleeptime *= 2;
+                  } catch (InterruptedException ie) {
+                  }
+                }
             } else {
-              LOG.info(StringUtils.stringifyException(e));
-              if (System.currentTimeMillis() - localstart > 5000) {
-                LOG.info("Waiting for replication for " + 
-                         (System.currentTimeMillis() - localstart)/1000 + 
-                         " seconds");
-              }
-              try {
-                LOG.warn("NotReplicatedYetException sleeping " + src +
-                          " retries left " + retries);
-                Thread.sleep(sleeptime);
-                sleeptime *= 2;
-              } catch (InterruptedException ie) {
-              }
-            }                
+              throw e;
+            }
           }
         }
       } 

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=805158&r1=805157&r2=805158&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
(original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Mon Aug 17 21:45:02 2009
@@ -21,10 +21,18 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.server.common.*;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
 
 import junit.framework.TestCase;
 
@@ -34,6 +42,8 @@
  * properly in case of errors.
  */
 public class TestDFSClientRetries extends TestCase {
+  public static final Log LOG =
+    LogFactory.getLog(TestDFSClientRetries.class.getName());
   
   // writes 'len' bytes of data to out.
   private static void writeData(OutputStream out, int len) throws IOException {
@@ -97,4 +107,132 @@
   }
   
   // more tests related to different failure cases can be added here.
+  
+  class TestNameNode implements ClientProtocol
+  {
+    int num_calls = 0;
+    
+    // The total number of calls that can be made to addBlock
+    // before an exception is thrown
+    int num_calls_allowed; 
+    public final String ADD_BLOCK_EXCEPTION = "Testing exception thrown from"
+                                             + "TestDFSClientRetries::"
+                                             + "TestNameNode::addBlock";
+    public final String RETRY_CONFIG
+          = "dfs.client.block.write.locateFollowingBlock.retries";
+          
+    public TestNameNode(Configuration conf) throws IOException
+    {
+      // +1 because the configuration value is the number of retries and
+      // the first call is not a retry (e.g., 2 retries == 3 total
+      // calls allowed)
+      this.num_calls_allowed = conf.getInt(RETRY_CONFIG, 5) + 1;
+    }
+
+    public long getProtocolVersion(String protocol, 
+                                     long clientVersion)
+    throws IOException
+    {
+      return versionID;
+    }
+
+    public LocatedBlock addBlock(String src, String clientName)
+    throws IOException
+    {
+      num_calls++;
+      if (num_calls > num_calls_allowed) { 
+        throw new IOException("addBlock called more times than "
+                              + RETRY_CONFIG
+                              + " allows.");
+      } else {
+          throw new RemoteException(NotReplicatedYetException.class.getName(),
+                                    ADD_BLOCK_EXCEPTION);
+      }
+    }
+    
+    
+    // The following methods are stub methods that are not needed by this mock class
+
+    public LocatedBlocks  getBlockLocations(String src, long offset, long length) throws
IOException { return null; }
+
+    public void create(String src, FsPermission masked, String clientName, boolean overwrite,
short replication, long blockSize) throws IOException {}
+
+    public LocatedBlock append(String src, String clientName) throws IOException { return
null; }
+
+    public boolean setReplication(String src, short replication) throws IOException { return
false; }
+
+    public void setPermission(String src, FsPermission permission) throws IOException {}
+
+    public void setOwner(String src, String username, String groupname) throws IOException
{}
+
+    public void abandonBlock(Block b, String src, String holder) throws IOException {}
+
+    public boolean complete(String src, String clientName) throws IOException { return false;
}
+
+    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
+
+    public boolean rename(String src, String dst) throws IOException { return false; }
+
+    public boolean delete(String src) throws IOException { return false; }
+
+    public boolean delete(String src, boolean recursive) throws IOException { return false;
}
+
+    public boolean mkdirs(String src, FsPermission masked) throws IOException { return false;
}
+
+    public FileStatus[] getListing(String src) throws IOException { return null; }
+
+    public void renewLease(String clientName) throws IOException {}
+
+    public long[] getStats() throws IOException { return null; }
+
+    public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType type) throws IOException
{ return null; }
+
+    public long getPreferredBlockSize(String filename) throws IOException { return 0; }
+
+    public boolean setSafeMode(FSConstants.SafeModeAction action) throws IOException { return
false; }
+
+    public void saveNamespace() throws IOException {}
+
+    public boolean restoreFailedStorage(String arg) throws AccessControlException { return
false; }
+
+    public void refreshNodes() throws IOException {}
+
+    public void finalizeUpgrade() throws IOException {}
+
+    public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException
{ return null; }
+
+    public void metaSave(String filename) throws IOException {}
+
+    public FileStatus getFileInfo(String src) throws IOException { return null; }
+
+    public ContentSummary getContentSummary(String path) throws IOException { return null;
}
+
+    public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException
{}
+
+    public void fsync(String src, String client) throws IOException {}
+
+    public void setTimes(String src, long mtime, long atime) throws IOException {}
+
+  }
+  
+  public void testNotYetReplicatedErrors() throws IOException
+  {   
+    Configuration conf = new Configuration();
+    
+    // allow 1 retry (2 total calls)
+    conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
+        
+    TestNameNode tnn = new TestNameNode(conf);
+    DFSClient client = new DFSClient(tnn, tnn, conf, null);
+    OutputStream os = client.create("testfile", true);
+    os.write(20); // write one random byte
+    
+    try {
+      os.close();
+    } catch (Exception e) {
+      assertTrue("Retries are not being stopped correctly",
+           e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
+    }
+  }
+  
 }



Mime
View raw message