hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r894233 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Mon, 28 Dec 2009 19:45:01 GMT
Author: dhruba
Date: Mon Dec 28 19:45:00 2009
New Revision: 894233

URL: http://svn.apache.org/viewvc?rev=894233&view=rev
Log:
HDFS-767. An improved retry policy when the DFSClient is unable to fetch a
block from the datanode.  (Ning Zhang via dhruba)


Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=894233&r1=894232&r2=894233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Dec 28 19:45:00 2009
@@ -75,6 +75,9 @@
     HDFS-94. The Heap Size printed in the NameNode WebUI is accurate.
     (Dmytro Molkov via dhruba)
 
+    HDFS-767. An improved retry policy when the DFSClient is unable to fetch a
+    block from the datanode.  (Ning Zhang via dhruba)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=894233&r1=894232&r2=894233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Mon Dec 28 19:45:00 2009
@@ -1669,6 +1669,7 @@
     private long pos = 0;
     private long blockEnd = -1;
     private int failures = 0;
+    private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException
is caught
 
     /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
      * parallel accesses to DFSInputStream (through ptreads) properly */
@@ -1688,6 +1689,7 @@
       this.buffersize = buffersize;
       this.src = src;
       prefetchSize = conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize);
+      timeWindow = conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
       openInfo();
     }
 
@@ -2140,7 +2142,19 @@
               + " from any node: " + ie
               + ". Will get new block locations from namenode and retry...");
           try {
-            Thread.sleep(3000);
+            // Introducing a random factor to the wait time before another retry.
+            // The wait time is dependent on # of failures and a random factor.
+            // At the first time of getting a BlockMissingException, the wait time
+            // is a random number between 0..3000 ms. If the first retry
+            // still fails, we will wait 3000 ms grace period before the 2nd retry.
+            // Also at the second retry, the waiting window is expanded to 6000 ms
+            // alleviating the request rate from the server. Similarly the 3rd retry
+            // will wait 6000ms grace period before retry and the waiting window is
+            // expanded to 9000ms. 
+            double waitTime = timeWindow * failures +       // grace period for the last
round of attempt
+              timeWindow * (failures + 1) * r.nextDouble(); // expanding time window for
each failure
+            LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will
wait for " + waitTime + " msec.");
+            Thread.sleep((long)waitTime);
           } catch (InterruptedException iex) {
           }
           deadNodes.clear(); //2nd option is to remove only nodes[blockId]

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=894233&r1=894232&r2=894233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Dec 28 19:45:00
2009
@@ -95,6 +95,7 @@
   public static final String  DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
   public static final String  DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";

+  public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
   public static final String  DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
   public static final String  DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=894233&r1=894232&r2=894233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Dec 28
19:45:00 2009
@@ -811,6 +811,16 @@
   public FileSystem getFileSystem() throws IOException {
     return FileSystem.get(conf);
   }
+  
+
+  /**
+   * Get another FileSystem instance that is different from FileSystem.get(conf).
+   * This simulating different threads working on different FileSystem instances.
+   */
+  public FileSystem getNewFileSystemInstance() throws IOException {
+    return FileSystem.newInstance(conf);
+  }
+  
 
   /**
    * Get the directories where the namenode stores its image.

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=894233&r1=894232&r2=894233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Mon Dec
28 19:45:00 2009
@@ -20,6 +20,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.security.MessageDigest;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -249,4 +251,242 @@
     }
   }
   
+  
+  /**
+   * Test that a DFSClient waits for random time before retry on busy blocks.
+   */
+  public void testDFSClientRetriesOnBusyBlocks() throws IOException {
+    
+    System.out.println("Testing DFSClient random waiting on busy blocks.");
+    
+    //
+    // Test settings: 
+    // 
+    //           xcievers    fileLen   #clients  timeWindow    #retries
+    //           ========    =======   ========  ==========    ========
+    // Test 1:          2       6 MB         50      300 ms           3
+    // Test 2:          2       6 MB         50      300 ms          50
+    // Test 3:          2       6 MB         50     1000 ms           3
+    // Test 4:          2       6 MB         50     1000 ms          50
+    // 
+    //   Minimum xcievers is 2 since 1 thread is reserved for registry.
+    //   Test 1 & 3 may fail since # retries is low. 
+    //   Test 2 & 4 should never fail since (#threads)/(xcievers-1) is the upper
+    //   bound for guarantee to not throw BlockMissingException.
+    //
+    int xcievers  = 2;
+    int fileLen   = 6*1024*1024;
+    int threads   = 50;
+    int retries   = 3;
+    int timeWin   = 300;
+    
+    //
+    // Test 1: might fail
+    // 
+    long timestamp = System.currentTimeMillis();
+    boolean pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+    long timestamp2 = System.currentTimeMillis();
+    if ( pass ) {
+      LOG.info("Test 1 succeeded! Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+    } else {
+      LOG.warn("Test 1 failed, but relax. Time spent: " + (timestamp2-timestamp)/1000.0 +
" sec.");
+    }
+    
+    //
+    // Test 2: should never fail
+    // 
+    retries = 50;
+    timestamp = System.currentTimeMillis();
+    pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+    timestamp2 = System.currentTimeMillis();
+    assertTrue("Something wrong! Test 2 got Exception with maxmum retries!", pass);
+    LOG.info("Test 2 succeeded! Time spent: "  + (timestamp2-timestamp)/1000.0 + " sec.");
+    
+    //
+    // Test 3: might fail
+    // 
+    retries = 3;
+    timeWin = 1000;
+    timestamp = System.currentTimeMillis();
+    pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+    timestamp2 = System.currentTimeMillis();
+    if ( pass ) {
+      LOG.info("Test 3 succeeded! Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+    } else {
+      LOG.warn("Test 3 failed, but relax. Time spent: " + (timestamp2-timestamp)/1000.0 +
" sec.");
+    }
+    
+    //
+    // Test 4: should never fail
+    //
+    retries = 50;
+    timeWin = 1000;
+    timestamp = System.currentTimeMillis();
+    pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+    timestamp2 = System.currentTimeMillis();
+    assertTrue("Something wrong! Test 4 got Exception with maxmum retries!", pass);
+    LOG.info("Test 4 succeeded! Time spent: "  + (timestamp2-timestamp)/1000.0 + " sec.");
+  }
+
+  private boolean busyTest(int xcievers, int threads, int fileLen, int timeWin, int retries)

+    throws IOException {
+
+    boolean ret = true;
+    short replicationFactor = 1;
+    long blockSize = 128*1024*1024; // DFS block size
+    int bufferSize = 4096;
+    
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt("dfs.datanode.max.xcievers",xcievers);
+    conf.setInt("dfs.client.max.block.acquire.failures", retries);
+    conf.setInt("dfs.client.retry.window.base", timeWin);
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, replicationFactor, true, null);
+    cluster.waitActive();
+    
+    FileSystem fs = cluster.getFileSystem();
+    Path file1 = new Path("test_data.dat");
+    file1 = file1.makeQualified(fs.getUri(), fs.getWorkingDirectory()); // make URI hdfs://
+    
+    try {
+      
+      FSDataOutputStream stm = fs.create(file1, true,
+                                         bufferSize,
+                                         replicationFactor,
+                                         blockSize);
+      
+      // verify that file exists in FS namespace
+      assertTrue(file1 + " should be a file", 
+                  fs.getFileStatus(file1).isDir() == false);
+      System.out.println("Path : \"" + file1 + "\"");
+      LOG.info("Path : \"" + file1 + "\"");
+
+      // write 1 block to file
+      byte[] buffer = AppendTestUtil.randomBytes(System.currentTimeMillis(), fileLen);
+      stm.write(buffer, 0, fileLen);
+      stm.close();
+
+      // verify that file size has changed to the full size
+      long len = fs.getFileStatus(file1).getLen();
+      
+      assertTrue(file1 + " should be of size " + fileLen +
+                 " but found to be of size " + len, 
+                  len == fileLen);
+      
+      // read back and check data integrigy
+      byte[] read_buf = new byte[fileLen];
+      InputStream in = fs.open(file1, fileLen);
+      IOUtils.readFully(in, read_buf, 0, fileLen);
+      assert(Arrays.equals(buffer, read_buf));
+      in.close();
+      read_buf = null; // GC it if needed
+      
+      // compute digest of the content to reduce memory space
+      MessageDigest m = MessageDigest.getInstance("SHA");
+      m.update(buffer, 0, fileLen);
+      byte[] hash_sha = m.digest();
+
+      // spawn multiple threads and all trying to access the same block
+      Thread[] readers = new Thread[threads];
+      Counter counter = new Counter(0);
+      for (int i = 0; i < threads; ++i ) {
+        DFSClientReader reader = new DFSClientReader(file1, cluster, hash_sha, fileLen, counter);
+        readers[i] = new Thread(reader);
+        readers[i].start();
+      }
+      
+      // wait for them to exit
+      for (int i = 0; i < threads; ++i ) {
+        readers[i].join();
+      }
+      if ( counter.get() == threads )
+        ret = true;
+      else
+        ret = false;
+      
+    } catch (InterruptedException e) {
+      System.out.println("Thread got InterruptedException.");
+      e.printStackTrace();
+      ret = false;
+    } catch (Exception e) {
+      e.printStackTrace();
+      ret = false;
+    } finally {
+      fs.delete(file1, false);
+      cluster.shutdown();
+    }
+    return ret;
+  }
+  
+  class DFSClientReader implements Runnable {
+    
+    DFSClient client;
+    Configuration conf;
+    byte[] expected_sha;
+    FileSystem  fs;
+    Path filePath;
+    MiniDFSCluster cluster;
+    int len;
+    Counter counter;
+
+    DFSClientReader(Path file, MiniDFSCluster cluster, byte[] hash_sha, int fileLen, Counter
cnt) {
+      filePath = file;
+      this.cluster = cluster;
+      counter = cnt;
+      len = fileLen;
+      conf = new HdfsConfiguration();
+      expected_sha = hash_sha;
+      try {
+        cluster.waitActive();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+    
+    public void run() {
+      try {
+        fs = cluster.getNewFileSystemInstance();
+        
+        int bufferSize = len;
+        byte[] buf = new byte[bufferSize];
+
+        InputStream in = fs.open(filePath, bufferSize);
+        
+        // read the whole file
+        IOUtils.readFully(in, buf, 0, bufferSize);
+        
+        // compare with the expected input
+        MessageDigest m = MessageDigest.getInstance("SHA");
+        m.update(buf, 0, bufferSize);
+        byte[] hash_sha = m.digest();
+        
+        buf = null; // GC if needed since there may be too many threads
+        in.close();
+        fs.close();
+
+        assertTrue("hashed keys are not the same size",
+                   hash_sha.length == expected_sha.length);
+
+        assertTrue("hashed keys are not equal",
+                   Arrays.equals(hash_sha, expected_sha));
+        
+        counter.inc(); // count this thread as successful
+        
+        LOG.info("Thread correctly read the block.");
+        
+      } catch (BlockMissingException e) {
+        LOG.info("Bad - BlockMissingException is caught.");
+        e.printStackTrace();
+      } catch (Exception e) {
+        e.printStackTrace();
+      } 
+    }
+  }
+
+  class Counter {
+    int counter;
+    Counter(int n) { counter = n; }
+    public synchronized void inc() { ++counter; }
+    public int get() { return counter; }
+  }
 }



Mime
View raw message