hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r679930 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Date Fri, 25 Jul 2008 23:42:56 GMT
Author: hairong
Date: Fri Jul 25 16:42:56 2008
New Revision: 679930

URL: http://svn.apache.org/viewvc?rev=679930&view=rev
Log:
HADOOP-3169. LeaseChecker daemon should not be started in DFSClient constructor. Contributed
by Tse Wo (Nicholas), SZE.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=679930&r1=679929&r2=679930&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 25 16:42:56 2008
@@ -112,6 +112,9 @@
     HADOOP-3747. Adds counter suport for MultipleOutputs. 
     (Alejandro Abdelnur via ddas)
 
+    HADOOP-3169. LeaseChecker daemon should not be started in DFSClient
+    constructor. (TszWo (Nicholas), SZE via hairong)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 
@@ -855,7 +858,7 @@
   
     HADOOP-3588. Fixed usability issues with archives. (mahadev)
 
-    HADOOP-3536. Uncaught exception in DataBlockScanner.
+    HADOOP-3635. Uncaught exception in DataBlockScanner.
     (Tsz Wo (Nicholas), SZE via hairong)
 
     HADOOP-3539. Exception when closing DFSClient while multiple files are

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=679930&r1=679929&r2=679930&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Jul 25 16:42:56 2008
@@ -61,7 +61,7 @@
  * filesystem tasks.
  *
  ********************************************************/
-public class DFSClient implements FSConstants {
+public class DFSClient implements FSConstants, java.io.Closeable {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@@ -71,7 +71,7 @@
   volatile boolean clientRunning = true;
   Random r = new Random();
   String clientName;
-  Daemon leaseChecker;
+  private final LeaseChecker leasechecker = new LeaseChecker();
   private Configuration conf;
   private long defaultBlockSize;
   private short defaultReplication;
@@ -81,12 +81,6 @@
   final int writePacketSize;
   private FileSystem.Statistics stats;
     
-  /**
-   * A map from name -> DFSOutputStream of files that are currently being
-   * written by this client.
-   */
-  private TreeMap<String, OutputStream> pendingCreates =
-    new TreeMap<String, OutputStream>();
  
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
@@ -186,8 +180,6 @@
     }
     defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) conf.getInt("dfs.replication", 3);
-    this.leaseChecker = new Daemon(new LeaseChecker());
-    this.leaseChecker.start();
   }
 
   public DFSClient(InetSocketAddress nameNodeAddr, 
@@ -207,33 +199,13 @@
    * Close the file system, abandoning all of the leases and files being
    * created and close connections to the namenode.
    */
-  public void close() throws IOException {
-    // synchronize in here so that we don't need to change the API
-    synchronized (this) {
-      checkOpen();
-      synchronized (pendingCreates) {
-        while (!pendingCreates.isEmpty()) {
-          String name = pendingCreates.firstKey();
-          OutputStream out = pendingCreates.remove(name);
-          if (out != null) {
-            try {
-              out.close();
-            } catch (IOException ie) {
-              System.err.println("Exception closing file " + name);
-              ie.printStackTrace();
-            }
-          }
-        }
-      }
-      this.clientRunning = false;
-      try {
-        leaseChecker.join();
-      } catch (InterruptedException ie) {
-      }
-      
-      // close connections to the namenode
-      RPC.stopProxy(rpcNamenode);
-    }
+  public synchronized void close() throws IOException {
+    checkOpen();
+    clientRunning = false;
+    leasechecker.close();
+
+    // close connections to the namenode
+    RPC.stopProxy(rpcNamenode);
   }
 
   /**
@@ -477,9 +449,7 @@
     OutputStream result = new DFSOutputStream(src, masked,
         overwrite, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
-    synchronized (pendingCreates) {
-      pendingCreates.put(src, result);
-    }
+    leasechecker.put(src, result);
     return result;
   }
 
@@ -508,9 +478,7 @@
     }
     OutputStream result = new DFSOutputStream(src, buffersize, progress,
         lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
-    synchronized(pendingCreates) {
-      pendingCreates.put(src, result);
-    }
+    leasechecker.put(src, result);
     return result;
   }
 
@@ -803,35 +771,98 @@
     throw new IOException("No live nodes contain current block");
   }
 
-  /***************************************************************
-   * Periodically check in with the namenode and renew all the leases
-   * when the lease period is half over.
-   ***************************************************************/
-  class LeaseChecker implements Runnable {
+  boolean isLeaseCheckerStarted() {
+    return leasechecker.daemon != null;
+  }
+
+  /** Lease management*/
+  private class LeaseChecker implements Runnable {
+    /** A map from src -> DFSOutputStream of files that are currently being
+     * written by this client.
+     */
+    private final SortedMap<String, OutputStream> pendingCreates
+        = new TreeMap<String, OutputStream>();
+
+    private Daemon daemon = null;
+    
+    synchronized void put(String src, OutputStream out) {
+      if (clientRunning) {
+        if (daemon == null) {
+          daemon = new Daemon(this);
+          daemon.start();
+        }
+        pendingCreates.put(src, out);
+      }
+    }
+    
+    synchronized void remove(String src) {
+      pendingCreates.remove(src);
+    }
+
+    synchronized void close() {
+      while (!pendingCreates.isEmpty()) {
+        String src = pendingCreates.firstKey();
+        OutputStream out = pendingCreates.remove(src);
+        if (out != null) {
+          try {
+            out.close();
+          } catch (IOException ie) {
+            System.err.println("Exception closing file " + src);
+            ie.printStackTrace();
+          }
+        }
+      }
+      
+      if (daemon != null) {
+        daemon.interrupt();
+      }
+    }
+
+    private void renew() throws IOException {
+      synchronized(this) {
+        if (pendingCreates.isEmpty()) {
+          return;
+        }
+      }
+      namenode.renewLease(clientName);
+    }
+
     /**
+     * Periodically check in with the namenode and renew all the leases
+     * when the lease period is half over.
      */
     public void run() {
       long lastRenewed = 0;
       while (clientRunning) {
         if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
           try {
-            synchronized (pendingCreates) {
-              if (pendingCreates.size() > 0)
-                namenode.renewLease(clientName);
-            }
+            renew();
             lastRenewed = System.currentTimeMillis();
           } catch (IOException ie) {
-            String err = StringUtils.stringifyException(ie);
-            LOG.warn("Problem renewing lease for " + clientName +
-                     ": " + err);
+            LOG.warn("Problem renewing lease for " + clientName, ie);
           }
         }
+
         try {
           Thread.sleep(1000);
         } catch (InterruptedException ie) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(this + " is interrupted.", ie);
+          }
+          return;
         }
       }
     }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      String s = getClass().getSimpleName();
+      if (LOG.isTraceEnabled()) {
+        return s + "@" + DFSClient.this + ": "
+               + StringUtils.stringifyException(new Throwable("for testing"));
+      }
+      return s;
+    }
   }
 
   /** Utility class to encapsulate data node info and its ip address. */
@@ -2792,10 +2823,7 @@
     @Override
     public void close() throws IOException {
       closeInternal();
-
-      synchronized (pendingCreates) {
-        pendingCreates.remove(src);
-      }
+      leasechecker.remove(src);
       
       if (s != null) {
         s.close();
@@ -2918,4 +2946,10 @@
                + StringUtils.stringifyException(ie));
     }
   }
+
+  /** {@inheritDoc} */
+  public String toString() {
+    return getClass().getSimpleName() + "[clientName=" + clientName
+        + ", ugi=" + ugi + "]"; 
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=679930&r1=679929&r2=679930&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Jul
25 16:42:56 2008
@@ -21,6 +21,8 @@
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -65,4 +67,47 @@
     }
   }
 
+  public void testDFSClient() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 2, true, null);
+      final Path filepath = new Path("/test/LeaseChecker/foo");
+      final long millis = System.currentTimeMillis();
+
+      {
+        DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+  
+        //create a file
+        FSDataOutputStream out = dfs.create(filepath);
+        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+  
+        //write something and close
+        out.writeLong(millis);
+        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        out.close();
+        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        dfs.close();
+      }
+
+      {
+        DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+
+        //open and check the file
+        FSDataInputStream in = dfs.open(filepath);
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertEquals(millis, in.readLong());
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        in.close();
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        dfs.close();
+      }
+    }
+    finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
 }



Mime
View raw message