hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1101282 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Tue, 10 May 2011 00:51:03 GMT
Author: szetszwo
Date: Tue May 10 00:51:03 2011
New Revision: 1101282

URL: http://svn.apache.org/viewvc?rev=1101282&view=rev
Log:
HDFS-1865. Share LeaseRenewer among DFSClients so that there is only a LeaseRenewer thread
per namenode per user.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLease.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1101282&r1=1101281&r2=1101282&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue May 10 00:51:03 2011
@@ -389,6 +389,9 @@ Trunk (unreleased changes)
     HDFS-1890. Improve the name, class and value type of the map
     LeaseRenewer.pendingCreates.  (szetszwo)
 
+    HDFS-1865. Share LeaseRenewer among DFSClients so that there is only a
+    LeaseRenewer thread per namenode per user.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1101282&r1=1101281&r2=1101282&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue May 10 00:51:03 2011
@@ -136,9 +136,14 @@ public class DFSClient implements FSCons
   final LeaseRenewer leaserenewer;
 
   /**
-   * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
-   * lock on leasechecker, followed by lock on an individual DFSOutputStream.
+   * A map from file names to {@link DFSOutputStream} objects
+   * that are currently being written by this client.
+   * Note that a file can only be written by a single client.
    */
+  private final Map<String, DFSOutputStream> filesBeingWritten
+      = new HashMap<String, DFSOutputStream>();
+
+  /** Create a {@link NameNode} proxy */
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
   }
@@ -249,13 +254,14 @@ public class DFSClient implements FSCons
 
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
-    this.leaserenewer = new LeaseRenewer(this, hdfsTimeout);
-
     this.ugi = UserGroupInformation.getCurrentUser();
+    final String authority = nameNodeAddr == null? "null":
+        nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
+    this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
     
     String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
-    this.clientName = "DFSClient_" + taskId + "_" +
-                      r.nextInt() + "_" + Thread.currentThread().getId(); 
+    this.clientName = leaserenewer.getClientName(taskId);
+
     defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) 
       conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
@@ -309,19 +315,77 @@ public class DFSClient implements FSCons
     }
   }
 
+  /** Put a file. */
+  void putFileBeingWritten(final String src, final DFSOutputStream out) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.put(src, out);
+    }
+  }
+
+  /** Remove a file. */
+  void removeFileBeingWritten(final String src) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.remove(src);
+    }
+  }
+
+  /** Is file-being-written map empty? */
+  boolean isFilesBeingWrittenEmpty() {
+    synchronized(filesBeingWritten) {
+      return filesBeingWritten.isEmpty();
+    }
+  }
+
+  /** Renew leases */
+  void renewLease() throws IOException {
+    if (clientRunning && !isFilesBeingWrittenEmpty()) {
+      namenode.renewLease(clientName);
+    }
+  }
+
+  /** Abort and release resources held.  Ignore all errors. */
+  void abort() {
+    clientRunning = false;
+    closeAllFilesBeingWritten(true);
+    RPC.stopProxy(rpcNamenode); // close connections to the namenode
+  }
+
+  /** Close/abort all files being written. */
+  private void closeAllFilesBeingWritten(final boolean abort) {
+    for(;;) {
+      final String src;
+      final DFSOutputStream out;
+      synchronized(filesBeingWritten) {
+        if (filesBeingWritten.isEmpty()) {
+          return;
+        }
+        src = filesBeingWritten.keySet().iterator().next();
+        out = filesBeingWritten.remove(src);
+      }
+      if (out != null) {
+        try {
+          if (abort) {
+            out.abort();
+          } else {
+            out.close();
+          }
+        } catch(IOException ie) {
+          LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
+              ie);
+        }
+      }
+    }
+  }
+
   /**
    * Close the file system, abandoning all of the leases and files being
    * created and close connections to the namenode.
    */
   public synchronized void close() throws IOException {
     if(clientRunning) {
-      leaserenewer.close();
+      closeAllFilesBeingWritten(false);
       clientRunning = false;
-      try {
-        leaserenewer.interruptAndJoin();
-      } catch (InterruptedException ie) {
-      }
-  
+      leaserenewer.closeClient(this);
       // close connections to the namenode
       RPC.stopProxy(rpcNamenode);
     }
@@ -633,7 +697,7 @@ public class DFSClient implements FSCons
         flag, createParent, replication, blockSize, progress, buffersize,
         conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
                     DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
-    leaserenewer.put(src, result);
+    leaserenewer.put(src, result, this);
     return result;
   }
   
@@ -680,7 +744,7 @@ public class DFSClient implements FSCons
           flag, createParent, replication, blockSize, progress, buffersize,
           bytesPerChecksum);
     }
-    leaserenewer.put(src, result);
+    leaserenewer.put(src, result, this);
     return result;
   }
   
@@ -759,7 +823,7 @@ public class DFSClient implements FSCons
           + src + " on client " + clientName);
     }
     final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
-    leaserenewer.put(src, result);
+    leaserenewer.put(src, result, this);
     return result;
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1101282&r1=1101281&r2=1101282&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue May 10 00:51:03
2011
@@ -1639,7 +1639,7 @@ class DFSOutputStream extends FSOutputSu
       ExtendedBlock lastBlock = streamer.getBlock();
       closeThreads(false);
       completeFile(lastBlock);
-      dfsClient.leaserenewer.remove(src);
+      dfsClient.leaserenewer.closeFile(src, dfsClient);
     } finally {
       closed = true;
     }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1101282&r1=1101281&r2=1101282&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/LeaseRenewer.java Tue May 10 00:51:03
2011
@@ -18,33 +18,150 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 
-public class LeaseRenewer {
+/**
+ * <p>
+ * Used by {@link DFSClient} for renewing file-being-written leases
+ * on the namenode.
+ * When a file is opened for write (create or append),
+ * namenode stores a file lease for recording the identity of the writer.
+ * The writer (i.e. the DFSClient) is required to renew the lease periodically.
+ * When the lease is not renewed before it expires,
+ * the namenode considers the writer as failed and then it may either let
+ * another writer to obtain the lease or close the file.
+ * </p>
+ * <p>
+ * This class also provides the following functionality:
+ * <ul>
+ * <li>
+ * It maintains a map from (namenode, user) pairs to lease renewers. 
+ * The same {@link LeaseRenewer} instance is used for renewing lease
+ * for all the {@link DFSClient} to the same namenode and the same user.
+ * </li>
+ * <li>
+ * Each renewer maintains a list of {@link DFSClient}.
+ * Periodically the leases for all the clients are renewed.
+ * A client is removed from the list when the client is closed.
+ * </li>
+ * <li>
+ * A thread per namenode per user is used by the {@link LeaseRenewer}
+ * to renew the leases.
+ * </li>
+ * </ul>
+ * </p>
+ */
+class LeaseRenewer {
   private static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
 
   static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
   static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
-  /** A map from src -> DFSOutputStream of files that are currently being
-   * written by this client.
+
+  /** Get a {@link LeaseRenewer} instance */
+  static LeaseRenewer getInstance(final String authority,
+      final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
+    return Factory.INSTANCE.get(authority, ugi, dfsc);
+  }
+
+  /** 
+   * A factory for sharing {@link LeaseRenewer} objects
+   * among {@link DFSClient} instances
+   * so that there is only one renewer per authority per user.
    */
-  private final Map<String, DFSOutputStream> filesBeingWritten
-      = new TreeMap<String, DFSOutputStream>();
+  private static class Factory {
+    private static final Factory INSTANCE = new Factory();
+
+    private static class Key {
+      /** Namenode info */
+      final String authority;
+      /** User info */
+      final UserGroupInformation ugi;
+
+      private Key(final String authority, final UserGroupInformation ugi) {
+        if (authority == null) {
+          throw new HadoopIllegalArgumentException("authority == null");
+        } else if (ugi == null) {
+          throw new HadoopIllegalArgumentException("ugi == null");
+        }
+
+        this.authority = authority;
+        this.ugi = ugi;
+      }
+
+      @Override
+      public int hashCode() {
+        return authority.hashCode() ^ ugi.hashCode();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (obj == this) {
+          return true;
+        }
+        if (obj != null && obj instanceof Key) {
+          final Key that = (Key)obj;
+          return this.authority.equals(that.authority)
+                 && this.ugi.equals(that.ugi);
+        }
+        return false;        
+      }
+
+      @Override
+      public String toString() {
+        return ugi.getShortUserName() + "@" + authority;
+      }
+    }
+
+    /** A map for per user per namenode renewers. */
+    private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
+
+    /** Get a renewer. */
+    private synchronized LeaseRenewer get(final String authority,
+        final UserGroupInformation ugi, final DFSClient dfsc) {
+      final Key k = new Key(authority, ugi);
+      LeaseRenewer r = renewers.get(k);
+      if (r == null) {
+        r = new LeaseRenewer(k);
+        renewers.put(k, r);
+      }
+      r.addClient(dfsc);
+      return r;
+    }
+
+    /** Remove the given renewer. */
+    private synchronized void remove(final LeaseRenewer r) {
+      final LeaseRenewer stored = renewers.get(r.factorykey);
+      //Since a renewer may expire, the stored renewer can be different.
+      if (r == stored) {
+        if (!r.clientsRunning()) {
+          renewers.remove(r.factorykey);
+        }
+      }
+    }
+  }
+
+  private final String clienNamePostfix = DFSClient.r.nextInt()
+      + "_" + Thread.currentThread().getId();
+
   /** The time in milliseconds that the map became empty. */
   private long emptyTime = Long.MAX_VALUE;
   /** A fixed lease renewal time period in milliseconds */
-  private final long renewal;
+  private long renewal = FSConstants.LEASE_SOFTLIMIT_PERIOD/2;
 
   /** A daemon for renewing lease */
   private Daemon daemon = null;
@@ -54,7 +171,8 @@ public class LeaseRenewer {
   /** 
    * A period in milliseconds that the lease renewer thread should run
    * after the map became empty.
-   * If the map is empty for a time period longer than the grace period,
+   * In other words,
+   * if the map is empty for a time period longer than the grace period,
    * the renewer should terminate.  
    */
   private long gracePeriod;
@@ -62,26 +180,68 @@ public class LeaseRenewer {
    * The time period in milliseconds
    * that the renewer sleeps for each iteration. 
    */
-  private volatile long sleepPeriod;
+  private long sleepPeriod;
+
+  private final Factory.Key factorykey;
 
-  private final DFSClient dfsclient;
+  /** A list of clients corresponding to this renewer. */
+  private final List<DFSClient> dfsclients = new ArrayList<DFSClient>();
 
-  LeaseRenewer(final DFSClient dfsclient, final long timeout) {
-    this.dfsclient = dfsclient;
-    this.renewal = (timeout > 0 && timeout < FSConstants.LEASE_SOFTLIMIT_PERIOD)?

-        timeout/2: FSConstants.LEASE_SOFTLIMIT_PERIOD/2;
+  private LeaseRenewer(Factory.Key factorykey) {
+    this.factorykey = factorykey;
     setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
   }
 
+  /** @return the renewal time in milliseconds. */
+  private synchronized long getRenewalTime() {
+    return renewal;
+  }
+
+  /** @return the client name for the given id. */
+  String getClientName(final String id) {
+    return "DFSClient_" + id + "_" + clienNamePostfix;
+  }
+
+  /** Add a client. */
+  private synchronized void addClient(final DFSClient dfsc) {
+    for(DFSClient c : dfsclients) {
+      if (c == dfsc) {
+        //client already exists, nothing to do.
+        return;
+      }
+    }
+    //client not found, add it
+    dfsclients.add(dfsc);
+
+    //update renewal time
+    if (dfsc.hdfsTimeout > 0) {
+      final long half = dfsc.hdfsTimeout/2;
+      if (half < renewal) {
+        this.renewal = half;
+      }
+    }
+  }
+
+  private synchronized boolean clientsRunning() {
+    for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
+      if (!i.next().clientRunning) {
+        i.remove();
+      }
+    }
+    return !dfsclients.isEmpty();
+  }
+
+  private synchronized long getSleepPeriod() {
+    return sleepPeriod;    
+  }
+
   /** Set the grace period and adjust the sleep period accordingly. */
-  void setGraceSleepPeriod(final long gracePeriod) {
+  synchronized void setGraceSleepPeriod(final long gracePeriod) {
     if (gracePeriod < 100L) {
       throw new HadoopIllegalArgumentException(gracePeriod
           + " = gracePeriod < 100ms is too small.");
     }
-    synchronized(this) {
-      this.gracePeriod = gracePeriod;
-    }
+    this.gracePeriod = gracePeriod;
     final long half = gracePeriod/2;
     this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
         half: LEASE_RENEWER_SLEEP_DEFAULT;
@@ -98,9 +258,10 @@ public class LeaseRenewer {
         && System.currentTimeMillis() - emptyTime > gracePeriod;
   }
 
-  synchronized void put(String src, DFSOutputStream out) {
-    if (dfsclient.clientRunning) {
-      if (daemon == null || isRenewerExpired()) {
+  synchronized void put(final String src, final DFSOutputStream out,
+      final DFSClient dfsc) {
+    if (dfsc.clientRunning) {
+      if (!isRunning() || isRenewerExpired()) {
         //start a new deamon with a new id.
         final int id = ++currentId;
         daemon = new Daemon(new Runnable() {
@@ -113,24 +274,68 @@ public class LeaseRenewer {
                 LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
                     + " is interrupted.", e);
               }
+            } finally {
+              synchronized(LeaseRenewer.this) {
+                Factory.INSTANCE.remove(LeaseRenewer.this);
+              }
             }
           }
         });
         daemon.start();
       }
-      filesBeingWritten.put(src, out);
+      dfsc.putFileBeingWritten(src, out);
       emptyTime = Long.MAX_VALUE;
     }
   }
-  
-  synchronized void remove(String src) {
-    filesBeingWritten.remove(src);
-    if (filesBeingWritten.isEmpty() && emptyTime == Long.MAX_VALUE) {
-      //discover the first time that the map is empty.
-      emptyTime = System.currentTimeMillis();
+
+  /** Close a file. */
+  void closeFile(final String src, final DFSClient dfsc) {
+    dfsc.removeFileBeingWritten(src);
+
+    synchronized(this) {
+      //update emptyTime if necessary
+      if (emptyTime == Long.MAX_VALUE) {
+        for(DFSClient c : dfsclients) {
+          if (!c.isFilesBeingWrittenEmpty()) {
+            //found a non-empty file-being-written map
+            return;
+          }
+        }
+        //discover the first time that all file-being-written maps are empty.
+        emptyTime = System.currentTimeMillis();
+      }
+    }
+  }
+
+  /** Close the given client. */
+  synchronized void closeClient(final DFSClient dfsc) {
+    dfsclients.remove(dfsc);
+    if (dfsclients.isEmpty()) {
+      if (!isRunning() || isRenewerExpired()) {
+        Factory.INSTANCE.remove(LeaseRenewer.this);
+        return;
+      }
+      if (emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the client list is empty.
+        emptyTime = System.currentTimeMillis();
+      }
+    }
+
+    //update renewal time
+    if (renewal == dfsc.hdfsTimeout/2) {
+      long min = FSConstants.LEASE_SOFTLIMIT_PERIOD;
+      for(DFSClient c : dfsclients) {
+        if (c.hdfsTimeout > 0) {
+          final long half = c.hdfsTimeout;
+          if (half < min) {
+            min = half;
+          }
+        }
+      }
+      renewal = min/2;
     }
   }
-  
+
   void interruptAndJoin() throws InterruptedException {
     Daemon daemonCopy = null;
     synchronized (this) {
@@ -148,53 +353,27 @@ public class LeaseRenewer {
     }
   }
 
-  void close() {
-    while (true) {
-      String src;
-      OutputStream out;
-      synchronized (this) {
-        if (filesBeingWritten.isEmpty()) {
-          return;
-        }
-        src = filesBeingWritten.keySet().iterator().next();
-        out = filesBeingWritten.remove(src);
-      }
-      if (out != null) {
-        try {
-          out.close();
-        } catch (IOException ie) {
-          LOG.error("Exception closing file " + src+ " : " + ie, ie);
-        }
-      }
-    }
-  }
-
-  /**
-   * Abort all open files. Release resources held. Ignore all errors.
-   */
-  synchronized void abort() {
-    dfsclient.clientRunning = false;
-    for(Map.Entry<String, DFSOutputStream> e : filesBeingWritten.entrySet()) {
-      final DFSOutputStream out = e.getValue();
-      if (out != null) {
-        try {
-          out.abort();
-        } catch (IOException ie) {
-          LOG.error("Failed to abort file " + e.getKey(), ie);
-        }
-      }
-    }
-    filesBeingWritten.clear();
-    RPC.stopProxy(dfsclient.rpcNamenode); // close connections to the namenode
-  }
-
   private void renew() throws IOException {
+    final List<DFSClient> copies;
     synchronized(this) {
-      if (filesBeingWritten.isEmpty()) {
-        return;
+      copies = new ArrayList<DFSClient>(dfsclients);
+    }
+    //sort the client names for finding out repeated names.
+    Collections.sort(copies, new Comparator<DFSClient>() {
+      @Override
+      public int compare(final DFSClient left, final DFSClient right) {
+        return left.clientName.compareTo(right.clientName);
+      }
+    });
+    String previousName = "";
+    for(int i = 0; i < copies.size(); i++) {
+      final DFSClient c = copies.get(i);
+      //skip if current client name is the same as the previous name.
+      if (!c.clientName.equals(previousName)) {
+        c.renewLease();
+        previousName = c.clientName;
       }
     }
-    dfsclient.namenode.renewLease(dfsclient.clientName);
   }
 
   /**
@@ -203,20 +382,25 @@ public class LeaseRenewer {
    */
   private void run(final int id) throws InterruptedException {
     for(long lastRenewed = System.currentTimeMillis();
-        dfsclient.clientRunning && !Thread.interrupted();
-        Thread.sleep(sleepPeriod)) {
-      if (System.currentTimeMillis() - lastRenewed >= renewal) {
+        clientsRunning() && !Thread.interrupted();
+        Thread.sleep(getSleepPeriod())) {
+      if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) {
         try {
           renew();
           lastRenewed = System.currentTimeMillis();
         } catch (SocketTimeoutException ie) {
-          LOG.warn("Failed to renew lease for " + dfsclient.clientName + " for "
-              + (renewal/1000) + " seconds.  Aborting ...", ie);
-          abort();
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (getRenewalTime()/1000) + " seconds.  Aborting ...", ie);
+          synchronized (this) {
+            for(DFSClient c : dfsclients) {
+              c.abort();
+            }
+          }
           break;
         } catch (IOException ie) {
-          LOG.warn("Failed to renew lease for " + dfsclient.clientName + " for "
-              + (renewal/1000) + " seconds.  Will retry shortly ...", ie);
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (getRenewalTime()/1000) + " seconds.  Will retry shortly ...",
+              ie);
         }
       }
 
@@ -229,13 +413,27 @@ public class LeaseRenewer {
     }
   }
 
-  /** {@inheritDoc} */
+  @Override
   public String toString() {
-    String s = getClass().getSimpleName();
+    String s = getClass().getSimpleName() + ":" + factorykey;
     if (LOG.isTraceEnabled()) {
-      return s + "@" + dfsclient + ": "
+      return s + ", clients=" +  clientsString() + ", "
              + StringUtils.stringifyException(new Throwable("for testing"));
     }
     return s;
   }
+
+  /** Get the names of all clients */
+  private synchronized String clientsString() {
+    if (dfsclients.isEmpty()) {
+      return "[]";
+    } else {
+      final StringBuilder b = new StringBuilder("[").append(
+          dfsclients.get(0).clientName);
+      for(int i = 1; i < dfsclients.size(); i++) {
+        b.append(", ").append(dfsclients.get(i).clientName);
+      }
+      return b.append("]").toString();
+    }
+  }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLease.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLease.java?rev=1101282&r1=1101281&r2=1101282&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLease.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLease.java Tue May 10 00:51:03
2011
@@ -17,25 +17,32 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.*;
+import java.io.DataOutputStream;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
 
-public class TestLease extends junit.framework.TestCase {
+public class TestLease {
   static boolean hasLease(MiniDFSCluster cluster, Path src) {
     return cluster.getNamesystem().leaseManager.getLeaseByPath(src.toString()) != null;
   }
   
   final Path dir = new Path("/test/lease/");
 
+  @Test
   public void testLease() throws Exception {
     Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     try {
       FileSystem fs = cluster.getFileSystem();
-      assertTrue(fs.mkdirs(dir));
+      Assert.assertTrue(fs.mkdirs(dir));
       
       Path a = new Path(dir, "a");
       Path b = new Path(dir, "b");
@@ -43,24 +50,56 @@ public class TestLease extends junit.fra
       DataOutputStream a_out = fs.create(a);
       a_out.writeBytes("something");
 
-      assertTrue(hasLease(cluster, a));
-      assertTrue(!hasLease(cluster, b));
+      Assert.assertTrue(hasLease(cluster, a));
+      Assert.assertTrue(!hasLease(cluster, b));
       
       DataOutputStream b_out = fs.create(b);
       b_out.writeBytes("something");
 
-      assertTrue(hasLease(cluster, a));
-      assertTrue(hasLease(cluster, b));
+      Assert.assertTrue(hasLease(cluster, a));
+      Assert.assertTrue(hasLease(cluster, b));
 
       a_out.close();
       b_out.close();
 
-      assertTrue(!hasLease(cluster, a));
-      assertTrue(!hasLease(cluster, b));
+      Assert.assertTrue(!hasLease(cluster, a));
+      Assert.assertTrue(!hasLease(cluster, b));
       
       fs.delete(dir, true);
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
+
+  @Test
+  public void testFactory() throws Exception {
+    final String[] groups = new String[]{"supergroup"};
+    final UserGroupInformation[] ugi = new UserGroupInformation[3];
+    for(int i = 0; i < ugi.length; i++) {
+      ugi[i] = UserGroupInformation.createUserForTesting("user" + i, groups);
+    }
+
+    final Configuration conf = new Configuration();
+    final DFSClient c1 = createDFSClientAs(ugi[0], conf);
+    final DFSClient c2 = createDFSClientAs(ugi[0], conf);
+    Assert.assertEquals(c1.leaserenewer, c2.leaserenewer);
+    final DFSClient c3 = createDFSClientAs(ugi[1], conf);
+    Assert.assertTrue(c1.leaserenewer != c3.leaserenewer);
+    final DFSClient c4 = createDFSClientAs(ugi[1], conf);
+    Assert.assertEquals(c3.leaserenewer, c4.leaserenewer);
+    final DFSClient c5 = createDFSClientAs(ugi[2], conf);
+    Assert.assertTrue(c1.leaserenewer != c5.leaserenewer);
+    Assert.assertTrue(c3.leaserenewer != c5.leaserenewer);
+  }
+  
+  static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class);
+  static public DFSClient createDFSClientAs(UserGroupInformation ugi, 
+      final Configuration conf) throws Exception {
+    return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
+      @Override
+      public DFSClient run() throws Exception {
+        return new DFSClient(null, mcp, conf, null);
+      }
+    });
+  }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java?rev=1101282&r1=1101281&r2=1101282&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java Tue May
10 00:51:03 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
@@ -58,7 +59,8 @@ public class TestReadWhileWriting {
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
 
     // create cluster
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(4).build();
     try {
       //change the lease limits.
       cluster.setLeasePeriod(SOFT_LEASE_LIMIT, HARD_LEASE_LIMIT);
@@ -93,7 +95,16 @@ public class TestReadWhileWriting {
         //sleep to let the lease is expired.
         Thread.sleep(2*SOFT_LEASE_LIMIT);
   
-        final DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.newInstance(conf);
+        final UserGroupInformation current = UserGroupInformation.getCurrentUser();
+        final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+            current.getShortUserName() + "x", new String[]{"supergroup"});
+        final DistributedFileSystem dfs = ugi.doAs(
+            new PrivilegedExceptionAction<DistributedFileSystem>() {
+          @Override
+          public DistributedFileSystem run() throws Exception {
+            return (DistributedFileSystem)FileSystem.newInstance(conf);
+          }
+        });
         final FSDataOutputStream out = append(dfs, p);
         write(out, 0, half);
         out.close();



Mime
View raw message