hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1407625 - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/ipc/ src/hdfs/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/
Date Fri, 09 Nov 2012 20:51:10 GMT
Author: szetszwo
Date: Fri Nov  9 20:51:09 2012
New Revision: 1407625

URL: http://svn.apache.org/viewvc?rev=1407625&view=rev
Log:
HDFS-4161. Backport HDFS-1865 "Share LeaseChecker thread among DFSClients" and the related
JIRAs: HDFS-278, HDFS-1840, HDFS-1870, HDFS-1890, HDFS-2810, HDFS-3646 and HDFS-2240.

Added:
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Nov  9 20:51:09 2012
@@ -313,6 +313,9 @@ Release 1.1.1 - Unreleased
     HADOOP-8995. Remove unnecessary bogus exception from Configuration.java.
     (Jing Zhao via suresh)
 
+    HDFS-4161. Backport HDFS-1865 "Share LeaseChecker thread among DFSClients"
+    and the related JIRAs: HDFS-278, HDFS-1840, HDFS-1870, HDFS-1890, HDFS-2810,
+    HDFS-3646 and HDFS-2240. (szetszwo)
 
   BUG FIXES
 

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java Fri Nov  9
20:51:09 2012
@@ -114,6 +114,21 @@ public class Client {
   }
   
   /**
+   * The time after which a RPC will timeout. If ping is not enabled (via
+   * ipc.client.ping), then the timeout value is the same as the pingInterval.
+   * If ping is enabled, then there is no timeout value.
+   * 
+   * @param conf Configuration
+   * @return the timeout period in milliseconds. -1 if no timeout value is set
+   */
+  final public static int getTimeout(Configuration conf) {
+    if (!conf.getBoolean("ipc.client.ping", true)) {
+      return getPingInterval(conf);
+    }
+    return -1;
+  }
+  
+  /**
    * Increment this client's reference count
    *
    */

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Nov
 9 20:51:09 2012
@@ -80,13 +80,12 @@ public class DFSClient implements FSCons
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
   public final ClientProtocol namenode;
-  private final ClientProtocol rpcNamenode;
+  final ClientProtocol rpcNamenode;
   private final InetSocketAddress nnAddress;
   final UserGroupInformation ugi;
   volatile boolean clientRunning = true;
-  Random r = new Random();
+  static Random r = new Random();
   final String clientName;
-  final LeaseChecker leasechecker = new LeaseChecker();
   private Configuration conf;
   private long defaultBlockSize;
   private short defaultReplication;
@@ -108,7 +107,18 @@ public class DFSClient implements FSCons
    */
   private volatile boolean serverSupportsHdfs630 = true;
   private volatile boolean serverSupportsHdfs200 = true;
- 
+  final int hdfsTimeout;    // timeout value for a DFS operation.
+  private final String authority;
+
+  /**
+   * A map from file names to {@link DFSOutputStream} objects
+   * that are currently being written by this client.
+   * Note that a file can only be written by a single client.
+   */
+  private final Map<String, DFSOutputStream> filesBeingWritten
+      = new HashMap<String, DFSOutputStream>();
+
+  /** Create a {@link NameNode} proxy */ 
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
   }
@@ -251,14 +261,14 @@ public class DFSClient implements FSCons
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
 
+    this.hdfsTimeout = Client.getTimeout(conf);
     ugi = UserGroupInformation.getCurrentUser();
+    this.authority = nameNodeAddr == null? "null":
+      nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
+    String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
+    this.clientName = "DFSClient_" + taskId + "_" + 
+        r.nextInt()  + "_" + Thread.currentThread().getId();
 
-    String taskId = conf.get("mapred.task.id");
-    if (taskId != null) {
-      this.clientName = "DFSClient_" + taskId; 
-    } else {
-      this.clientName = "DFSClient_" + r.nextInt();
-    }
     defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) conf.getInt("dfs.replication", 3);
 
@@ -310,20 +320,116 @@ public class DFSClient implements FSCons
       throw result;
     }
   }
+
+  /** Return the lease renewer instance. The renewer thread won't start
+   *  until the first output stream is created. The same instance will
+   *  be returned until all output streams are closed.
+   */
+  public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
+      return LeaseRenewer.getInstance(authority, ugi, this);
+  }
+
+  /** Get a lease and start automatic renewal */
+  private void beginFileLease(final String src, final DFSOutputStream out) 
+      throws IOException {
+    getLeaseRenewer().put(src, out, this);
+  }
+
+  /** Stop renewal of lease for the file. */
+  void endFileLease(final String src) throws IOException {
+    getLeaseRenewer().closeFile(src, this);
+  }
     
+
+  /** Put a file. Only called from LeaseRenewer, where proper locking is
+   *  enforced to consistently update its local dfsclients array and 
+   *  client's filesBeingWritten map.
+   */
+  void putFileBeingWritten(final String src, final DFSOutputStream out) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.put(src, out);
+    }
+  }
+
+  /** Remove a file. Only called from LeaseRenewer. */
+  void removeFileBeingWritten(final String src) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.remove(src);
+    }
+  }
+
+  /** Is file-being-written map empty? */
+  boolean isFilesBeingWrittenEmpty() {
+    synchronized(filesBeingWritten) {
+      return filesBeingWritten.isEmpty();
+    }
+  }
+
+  /**
+   * Renew leases.
+   * @return true if lease was renewed. May return false if this
+   * client has been closed or has no files open.
+   **/
+  boolean renewLease() throws IOException {
+    if (clientRunning && !isFilesBeingWrittenEmpty()) {
+      namenode.renewLease(clientName);
+      return true;
+    }
+    return false;
+  }
+
+  /** Abort and release resources held.  Ignore all errors. */
+  void abort() {
+    clientRunning = false;
+    closeAllFilesBeingWritten(true);
+    
+    try {
+      // remove reference to this client and stop the renewer,
+      // if there is no more clients under the renewer.
+      getLeaseRenewer().closeClient(this);
+    } catch (IOException ioe) {
+      LOG.info("Exception occurred while aborting the client. " + ioe);
+    }
+    RPC.stopProxy(rpcNamenode); // close connections to the namenode
+  }
+
+  /** Close/abort all files being written. */
+  private void closeAllFilesBeingWritten(final boolean abort) {
+    for(;;) {
+      final String src;
+      final DFSOutputStream out;
+      synchronized(filesBeingWritten) {
+        if (filesBeingWritten.isEmpty()) {
+          return;
+        }
+        src = filesBeingWritten.keySet().iterator().next();
+        out = filesBeingWritten.remove(src);
+      }
+      if (out != null) {
+        try {
+          if (abort) {
+            out.abort();
+          } else {
+            out.close();
+          }
+        } catch(IOException ie) {
+          LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
+              ie);
+        }
+      }
+    }
+  }
+
   /**
    * Close the file system, abandoning all of the leases and files being
    * created and close connections to the namenode.
    */
   public synchronized void close() throws IOException {
     if(clientRunning) {
-      leasechecker.close();
+      closeAllFilesBeingWritten(false);
       clientRunning = false;
-      try {
-        leasechecker.interruptAndJoin();
-      } catch (InterruptedException ie) {
-      }
-  
+
+      getLeaseRenewer().closeClient(this);
       // close connections to the namenode
       RPC.stopProxy(rpcNamenode);
     }
@@ -757,10 +863,10 @@ public class DFSClient implements FSCons
     }
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
-    OutputStream result = new DFSOutputStream(src, masked,
+    final DFSOutputStream result = new DFSOutputStream(src, masked,
         overwrite, createParent, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
-    leasechecker.put(src, result);
+    beginFileLease(src, result);
     return result;
   }
 
@@ -815,7 +921,7 @@ public class DFSClient implements FSCons
     }
     final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress,
         lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
-    leasechecker.put(src, result);
+    beginFileLease(src, result);
     return result;
   }
 
@@ -1392,117 +1498,6 @@ public class DFSClient implements FSCons
     throw new IOException("No live nodes contain current block");
   }
 
-  boolean isLeaseCheckerStarted() {
-    return leasechecker.daemon != null;
-  }
-
-  /** Lease management*/
-  class LeaseChecker implements Runnable {
-    /** A map from src -> DFSOutputStream of files that are currently being
-     * written by this client.
-     */
-    private final SortedMap<String, OutputStream> pendingCreates
-        = new TreeMap<String, OutputStream>();
-
-    private Daemon daemon = null;
-    
-    synchronized void put(String src, OutputStream out) {
-      if (clientRunning) {
-        if (daemon == null) {
-          daemon = new Daemon(this);
-          daemon.start();
-        }
-        pendingCreates.put(src, out);
-      }
-    }
-    
-    synchronized void remove(String src) {
-      pendingCreates.remove(src);
-    }
-    
-    void interruptAndJoin() throws InterruptedException {
-      Daemon daemonCopy = null;
-      synchronized (this) {
-        if (daemon != null) {
-          daemon.interrupt();
-          daemonCopy = daemon;
-        }
-      }
-     
-      if (daemonCopy != null) {
-        LOG.debug("Wait for lease checker to terminate");
-        daemonCopy.join();
-      }
-    }
-
-    void close() {
-      while (true) {
-        String src;
-        OutputStream out;
-        synchronized (this) {
-          if (pendingCreates.isEmpty()) {
-            return;
-          }
-          src = pendingCreates.firstKey();
-          out = pendingCreates.remove(src);
-        }
-        if (out != null) {
-          try {
-            out.close();
-          } catch (IOException ie) {
-            LOG.error("Exception closing file " + src+ " : " + ie, ie);
-          }
-        }
-      }
-    }
-
-    private void renew() throws IOException {
-      synchronized(this) {
-        if (pendingCreates.isEmpty()) {
-          return;
-        }
-      }
-      namenode.renewLease(clientName);
-    }
-
-    /**
-     * Periodically check in with the namenode and renew all the leases
-     * when the lease period is half over.
-     */
-    public void run() {
-      long lastRenewed = 0;
-      while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
-          try {
-            renew();
-            lastRenewed = System.currentTimeMillis();
-          } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName, ie);
-          }
-        }
-
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + " is interrupted.", ie);
-          }
-          return;
-        }
-      }
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-      String s = getClass().getSimpleName();
-      if (LOG.isTraceEnabled()) {
-        return s + "@" + DFSClient.this + ": "
-               + StringUtils.stringifyException(new Throwable("for testing"));
-      }
-      return s;
-    }
-  }
-
   /** Utility class to encapsulate data node info and its address. */
   private static class DNAddrPair {
     DatanodeInfo info;
@@ -3994,12 +3989,12 @@ public class DFSClient implements FSCons
           throw e;
       }
       closeInternal();
-      leasechecker.remove(src);
       
       if (s != null) {
         s.close();
         s = null;
       }
+      endFileLease(src);
     }
     
     /**
@@ -4012,6 +4007,20 @@ public class DFSClient implements FSCons
       closed = true;
     }
  
+    /**
+     * Aborts this output stream and releases any system 
+     * resources associated with this stream.
+     */
+    synchronized void abort() throws IOException {
+      if (closed) {
+        return;
+      }
+      setLastException(new IOException("Lease timeout of "
+          + (hdfsTimeout / 1000) + " seconds expired."));
+      closeThreads();
+      endFileLease(src);
+    }
+ 
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
       try {
@@ -4080,6 +4089,16 @@ public class DFSClient implements FSCons
         while (!fileComplete) {
           fileComplete = namenode.complete(src, clientName);
           if (!fileComplete) {
+            if (!clientRunning ||
+                  (hdfsTimeout > 0 &&
+                   localstart + hdfsTimeout < System.currentTimeMillis())) {
+                String msg = "Unable to close file because dfsclient " +
+                              " was unable to contact the HDFS servers." +
+                              " clientRunning " + clientRunning +
+                              " hdfsTimeout " + hdfsTimeout;
+                LOG.info(msg);
+                throw new IOException(msg);
+            }
             try {
               Thread.sleep(400);
               if (System.currentTimeMillis() - localstart > 5000) {

Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1407625&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java (added)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java Fri
Nov  9 20:51:09 2012
@@ -0,0 +1,457 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * <p>
+ * Used by {@link DFSClient} for renewing file-being-written leases
+ * on the namenode.
+ * When a file is opened for write (create or append),
+ * namenode stores a file lease for recording the identity of the writer.
+ * The writer (i.e. the DFSClient) is required to renew the lease periodically.
+ * When the lease is not renewed before it expires,
+ * the namenode considers the writer as failed and then it may either let
+ * another writer to obtain the lease or close the file.
+ * </p>
+ * <p>
+ * This class also provides the following functionality:
+ * <ul>
+ * <li>
+ * It maintains a map from (namenode, user) pairs to lease renewers. 
+ * The same {@link LeaseRenewer} instance is used for renewing lease
+ * for all the {@link DFSClient} to the same namenode and the same user.
+ * </li>
+ * <li>
+ * Each renewer maintains a list of {@link DFSClient}.
+ * Periodically the leases for all the clients are renewed.
+ * A client is removed from the list when the client is closed.
+ * </li>
+ * <li>
+ * A thread per namenode per user is used by the {@link LeaseRenewer}
+ * to renew the leases.
+ * </li>
+ * </ul>
+ * </p>
+ */
+class LeaseRenewer {
+  static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
+
+  static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+  static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
+
+  /** Get a {@link LeaseRenewer} instance */
+  static LeaseRenewer getInstance(final String authority,
+      final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
+    final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
+    r.addClient(dfsc);
+    return r;
+  }
+
+  /** 
+   * A factory for sharing {@link LeaseRenewer} objects
+   * among {@link DFSClient} instances
+   * so that there is only one renewer per authority per user.
+   */
+  private static class Factory {
+    private static final Factory INSTANCE = new Factory();
+
+    private static class Key {
+      /** Namenode info */
+      final String authority;
+      /** User info */
+      final UserGroupInformation ugi;
+
+      private Key(final String authority, final UserGroupInformation ugi) {
+        if (authority == null) {
+          throw new IllegalArgumentException("authority == null");
+        } else if (ugi == null) {
+          throw new IllegalArgumentException("ugi == null");
+        }
+
+        this.authority = authority;
+        this.ugi = ugi;
+      }
+
+      @Override
+      public int hashCode() {
+        return authority.hashCode() ^ ugi.hashCode();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (obj == this) {
+          return true;
+        }
+        if (obj != null && obj instanceof Key) {
+          final Key that = (Key)obj;
+          return this.authority.equals(that.authority)
+                 && this.ugi.equals(that.ugi);
+        }
+        return false;        
+      }
+
+      @Override
+      public String toString() {
+        return ugi.getShortUserName() + "@" + authority;
+      }
+    }
+
+    /** A map for per user per namenode renewers. */
+    private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
+
+    /** Get a renewer. */
+    private synchronized LeaseRenewer get(final String authority,
+        final UserGroupInformation ugi) {
+      final Key k = new Key(authority, ugi);
+      LeaseRenewer r = renewers.get(k);
+      if (r == null) {
+        r = new LeaseRenewer(k);
+        renewers.put(k, r);
+      }
+      return r;
+    }
+
+    /** Remove the given renewer. */
+    private synchronized void remove(final LeaseRenewer r) {
+      final LeaseRenewer stored = renewers.get(r.factorykey);
+      //Since a renewer may expire, the stored renewer can be different.
+      if (r == stored) {
+        if (!r.clientsRunning()) {
+          renewers.remove(r.factorykey);
+        }
+      }
+    }
+  }
+
+  /** The time in milliseconds that the map became empty. */
+  private long emptyTime = Long.MAX_VALUE;
+  /** A fixed lease renewal time period in milliseconds */
+  private long renewal = FSConstants.LEASE_SOFTLIMIT_PERIOD/2;
+
+  /** A daemon for renewing lease */
+  private Daemon daemon = null;
+  /** Only the daemon with currentId should run. */
+  private int currentId = 0;
+
+  /** 
+   * A period in milliseconds that the lease renewer thread should run
+   * after the map became empty.
+   * In other words,
+   * if the map is empty for a time period longer than the grace period,
+   * the renewer should terminate.  
+   */
+  private long gracePeriod;
+  /**
+   * The time period in milliseconds
+   * that the renewer sleeps for each iteration. 
+   */
+  private long sleepPeriod;
+
+  private final Factory.Key factorykey;
+
+  /** A list of clients corresponding to this renewer. */
+  private final List<DFSClient> dfsclients = new ArrayList<DFSClient>();
+
+  private LeaseRenewer(Factory.Key factorykey) {
+    this.factorykey = factorykey;
+    unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+  }
+
+  /** @return the renewal time in milliseconds. */
+  private synchronized long getRenewalTime() {
+    return renewal;
+  }
+
+  /** Add a client. */
+  private synchronized void addClient(final DFSClient dfsc) {
+    for(DFSClient c : dfsclients) {
+      if (c == dfsc) {
+        //client already exists, nothing to do.
+        return;
+      }
+    }
+    //client not found, add it
+    dfsclients.add(dfsc);
+
+    //update renewal time
+    if (dfsc.hdfsTimeout > 0) {
+      final long half = dfsc.hdfsTimeout/2;
+      if (half < renewal) {
+        this.renewal = half;
+      }
+    }
+  }
+
+  private synchronized boolean clientsRunning() {
+    for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
+      if (!i.next().clientRunning) {
+        i.remove();
+      }
+    }
+    return !dfsclients.isEmpty();
+  }
+
+  private synchronized long getSleepPeriod() {
+    return sleepPeriod;    
+  }
+
+  /** Set the grace period and adjust the sleep period accordingly. */
+  synchronized void setGraceSleepPeriod(final long gracePeriod) {
+    unsyncSetGraceSleepPeriod(gracePeriod);
+  }
+
+  private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
+    if (gracePeriod < 100L) {
+      throw new IllegalArgumentException(gracePeriod
+          + " = gracePeriod < 100ms is too small.");
+    }
+    this.gracePeriod = gracePeriod;
+    final long half = gracePeriod/2;
+    this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+        half: LEASE_RENEWER_SLEEP_DEFAULT;
+  }
+
+  /** Does this renewer have nothing to renew?  */
+  public boolean isEmpty() {
+    return dfsclients.isEmpty();
+  }
+
+  /** Is the daemon running? */
+  synchronized boolean isRunning() {
+    return daemon != null && daemon.isAlive();
+  }
+
+  /** Is the empty period longer than the grace period? */  
+  private synchronized boolean isRenewerExpired() {
+    return emptyTime != Long.MAX_VALUE
+        && System.currentTimeMillis() - emptyTime > gracePeriod;
+  }
+
+  synchronized void put(final String src, final DFSOutputStream out,
+      final DFSClient dfsc) {
+    if (dfsc.clientRunning) {
+      if (!isRunning() || isRenewerExpired()) {
+        //start a new deamon with a new id.
+        final int id = ++currentId;
+        daemon = new Daemon(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              LeaseRenewer.this.run(id);
+            } catch(InterruptedException e) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+                    + " is interrupted.", e);
+              }
+            } finally {
+              synchronized(LeaseRenewer.this) {
+                Factory.INSTANCE.remove(LeaseRenewer.this);
+              }
+            }
+          }
+        });
+        daemon.start();
+      }
+      dfsc.putFileBeingWritten(src, out);
+      emptyTime = Long.MAX_VALUE;
+    }
+  }
+
+  /** Close a file. */
+  void closeFile(final String src, final DFSClient dfsc) {
+    dfsc.removeFileBeingWritten(src);
+
+    synchronized(this) {
+      if (dfsc.isFilesBeingWrittenEmpty()) {
+        dfsclients.remove(dfsc);
+      }
+      //update emptyTime if necessary
+      if (emptyTime == Long.MAX_VALUE) {
+        for(DFSClient c : dfsclients) {
+          if (!c.isFilesBeingWrittenEmpty()) {
+            //found a non-empty file-being-written map
+            return;
+          }
+        }
+        //discover the first time that all file-being-written maps are empty.
+        emptyTime = System.currentTimeMillis();
+      }
+    }
+  }
+
+  /** Close the given client. */
+  synchronized void closeClient(final DFSClient dfsc) {
+    dfsclients.remove(dfsc);
+    if (dfsclients.isEmpty()) {
+      if (!isRunning() || isRenewerExpired()) {
+        Factory.INSTANCE.remove(LeaseRenewer.this);
+        return;
+      }
+      if (emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the client list is empty.
+        emptyTime = System.currentTimeMillis();
+      }
+    }
+
+    //update renewal time
+    if (renewal == dfsc.hdfsTimeout/2) {
+      long min = FSConstants.LEASE_SOFTLIMIT_PERIOD;
+      for(DFSClient c : dfsclients) {
+        if (c.hdfsTimeout > 0) {
+          final long half = c.hdfsTimeout;
+          if (half < min) {
+            min = half;
+          }
+        }
+      }
+      renewal = min/2;
+    }
+  }
+
+  void interruptAndJoin() throws InterruptedException {
+    Daemon daemonCopy = null;
+    synchronized (this) {
+      if (isRunning()) {
+        daemon.interrupt();
+        daemonCopy = daemon;
+      }
+    }
+   
+    if (daemonCopy != null) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Wait for lease checker to terminate");
+      }
+      daemonCopy.join();
+    }
+  }
+
+  private void renew() throws IOException {
+    final List<DFSClient> copies;
+    synchronized(this) {
+      copies = new ArrayList<DFSClient>(dfsclients);
+    }
+    //sort the client names for finding out repeated names.
+    Collections.sort(copies, new Comparator<DFSClient>() {
+      @Override
+      public int compare(final DFSClient left, final DFSClient right) {
+        return left.clientName.compareTo(right.clientName);
+      }
+    });
+    String previousName = "";
+    for(int i = 0; i < copies.size(); i++) {
+      final DFSClient c = copies.get(i);
+      //skip if current client name is the same as the previous name.
+      if (!c.clientName.equals(previousName)) {
+        if (!c.renewLease()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Did not renew lease for client " +
+                c);
+          }
+          continue;
+        }
+
+        previousName = c.clientName;
+      }
+    }
+  }
+
+  /**
+   * Periodically check in with the namenode and renew all the leases
+   * when the lease period is half over.
+   */
+  private void run(final int id) throws InterruptedException {
+    for(long lastRenewed = System.currentTimeMillis(); !Thread.interrupted();
+        Thread.sleep(getSleepPeriod())) {
+      if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) {
+        try {
+          renew();
+          lastRenewed = System.currentTimeMillis();
+        } catch (SocketTimeoutException ie) {
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (getRenewalTime()/1000) + " seconds.  Aborting ...", ie);
+          synchronized (this) {
+            for(DFSClient c : dfsclients) {
+              c.abort();
+            }
+          }
+          break;
+        } catch (IOException ie) {
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (getRenewalTime()/1000) + " seconds.  Will retry shortly ...",
+              ie);
+        }
+      }
+
+      synchronized(this) {
+        if (id != currentId || isRenewerExpired()) {
+          //no longer the current daemon or expired
+          return;
+        }
+
+        // if no clients are in running state or there is no more clients
+        // registered with this renewer, stop the daemon after the grace
+        // period.
+        if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
+          emptyTime = System.currentTimeMillis();
+        }
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    String s = getClass().getSimpleName() + ":" + factorykey;
+    if (LOG.isTraceEnabled()) {
+      return s + ", clients=" +  clientsString() + ", "
+             + StringUtils.stringifyException(new Throwable("for testing"));
+    }
+    return s;
+  }
+
+  /** Get the names of all clients */
+  private synchronized String clientsString() {
+    if (dfsclients.isEmpty()) {
+      return "[]";
+    } else {
+      final StringBuilder b = new StringBuilder("[").append(
+          dfsclients.get(0).clientName);
+      for(int i = 1; i < dfsclients.size(); i++) {
+        b.append(", ").append(dfsclients.get(i).clientName);
+      }
+      return b.append("]").toString();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java Fri
Nov  9 20:51:09 2012
@@ -165,7 +165,7 @@ public class AppendTestUtil {
     LOG.info("leasechecker.interruptAndJoin()");
     // lose the lease on the client
     DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
-    dfs.dfs.leasechecker.interruptAndJoin();
+    dfs.dfs.getLeaseRenewer().interruptAndJoin();
   }
   
   public static void recoverFile(MiniDFSCluster cluster, FileSystem fs,

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
(original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Fri Nov  9 20:51:09 2012
@@ -46,6 +46,10 @@ import org.junit.Test;
 public class TestDistributedFileSystem {
   private static final Random RAN = new Random();
 
+  {
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private boolean dualPortTesting = false;
   
   private Configuration getTestConfiguration() {
@@ -102,40 +106,108 @@ public class TestDistributedFileSystem {
   @Test
   public void testDFSClient() throws Exception {
     Configuration conf = getTestConfiguration();
+    final long grace = 1000L;
     MiniDFSCluster cluster = null;
 
     try {
       cluster = new MiniDFSCluster(conf, 2, true, null);
-      final Path filepath = new Path("/test/LeaseChecker/foo");
+      final String filepathstring = "/test/LeaseChecker/foo";
+      final Path[] filepaths = new Path[4];
+      for(int i = 0; i < filepaths.length; i++) {
+        filepaths[i] = new Path(filepathstring + i);
+      }
       final long millis = System.currentTimeMillis();
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace);
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
   
-        //create a file
-        FSDataOutputStream out = dfs.create(filepath);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
-  
-        //write something and close
-        out.writeLong(millis);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
-        out.close();
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        {
+          //create a file
+          final FSDataOutputStream out = dfs.create(filepaths[0]);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //write something
+          out.writeLong(millis);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //close
+          out.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.getLeaseRenewer().isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        }
+
+        {
+          //create file1
+          final FSDataOutputStream out1 = dfs.create(filepaths[1]);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //create file2
+          final FSDataOutputStream out2 = dfs.create(filepaths[2]);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+
+          //write something to file1
+          out1.writeLong(millis);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //close file1
+          out1.close();
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+
+          //write something to file2
+          out2.writeLong(millis);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //close file2
+          out2.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+        }
+
+        {
+          //create file3
+          final FSDataOutputStream out3 = dfs.create(filepaths[3]);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          Thread.sleep(grace/4*3);
+          //passed previous grace period, should still running
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //write something to file3
+          out3.writeLong(millis);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //close file3
+          out3.close();
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.getLeaseRenewer().isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        }
+
         dfs.close();
       }
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
 
         //open and check the file
-        FSDataInputStream in = dfs.open(filepath);
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        FSDataInputStream in = dfs.open(filepaths[0]);
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
         assertEquals(millis, in.readLong());
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
         in.close();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
         dfs.close();
       }
     }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Fri
Nov  9 20:51:09 2012
@@ -656,7 +656,7 @@ public class TestFileAppend4 extends Tes
       // has not been completed in the NN.
       // Lose the leases
       LOG.info("Killing lease checker");
-      client.leasechecker.interruptAndJoin();
+      client.getLeaseRenewer().interruptAndJoin();
 
       FileSystem fs1 = cluster.getFileSystem();
       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
@@ -726,7 +726,7 @@ public class TestFileAppend4 extends Tes
       // has not been completed in the NN.
       // Lose the leases
       LOG.info("Killing lease checker");
-      client.leasechecker.interruptAndJoin();
+      client.getLeaseRenewer().interruptAndJoin();
 
       FileSystem fs1 = cluster.getFileSystem();
       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
(original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
Fri Nov  9 20:51:09 2012
@@ -910,6 +910,53 @@ public class TestFileCreation {
       dfs.close();
     } finally {
       System.out.println("testFsClose successful");
+      cluster.shutdown();
+    }
+  }
+
+  // test closing file after cluster is shutdown
+  public void testFsCloseAfterClusterShutdown() throws IOException {
+    System.out.println("test testFsCloseAfterClusterShutdown start");
+    final int DATANODE_NUM = 3;
+
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.replication.min", 3);
+    conf.setBoolean("ipc.client.ping", false); // hdfs timeout is default 60 seconds
+    conf.setInt("ipc.ping.interval", 10000); // hdfs timeout is now 10 second
+
+    // create cluster
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem dfs = null;
+    try {
+      cluster.waitActive();
+      dfs = (DistributedFileSystem)cluster.getFileSystem();
+
+      // create a new file.
+      final String f = DIR + "dhrubashutdown";
+      final Path fpath = new Path(f);
+      FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
+      out.write("something_dhruba".getBytes());
+      out.sync();    // ensure that block is allocated
+
+      // shutdown last datanode in pipeline.
+      cluster.stopDataNode(2);
+
+      // close file. Since we have set the minReplcatio to 3 but have killed one
+      // of the three datanodes, the close call will loop until the hdfsTimeout is
+      // encountered.
+      boolean hasException = false;
+      try {
+        out.close();
+        System.out.println("testFsCloseAfterClusterShutdown: Error here");
+      } catch (IOException e) {
+        hasException = true;
+      }
+      assertTrue("Failed to close file after cluster shutdown", hasException);
+    } finally {
+      System.out.println("testFsCloseAfterClusterShutdown successful");
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java Fri Nov
 9 20:51:09 2012
@@ -17,13 +17,21 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.*;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
 
-public class TestLease extends junit.framework.TestCase {
+public class TestLease {
   static boolean hasLease(MiniDFSCluster cluster, Path src) {
     return cluster.getNameNode().getNamesystem().leaseManager
         .getLeaseByPath(src.toString()) != null;
@@ -31,12 +39,13 @@ public class TestLease extends junit.fra
   
   final Path dir = new Path("/test/lease/");
 
+  @Test
   public void testLease() throws Exception {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     try {
       FileSystem fs = cluster.getFileSystem();
-      assertTrue(fs.mkdirs(dir));
+      Assert.assertTrue(fs.mkdirs(dir));
       
       Path a = new Path(dir, "a");
       Path b = new Path(dir, "b");
@@ -44,24 +53,66 @@ public class TestLease extends junit.fra
       DataOutputStream a_out = fs.create(a);
       a_out.writeBytes("something");
 
-      assertTrue(hasLease(cluster, a));
-      assertTrue(!hasLease(cluster, b));
+      Assert.assertTrue(hasLease(cluster, a));
+      Assert.assertTrue(!hasLease(cluster, b));
       
       DataOutputStream b_out = fs.create(b);
       b_out.writeBytes("something");
 
-      assertTrue(hasLease(cluster, a));
-      assertTrue(hasLease(cluster, b));
+      Assert.assertTrue(hasLease(cluster, a));
+      Assert.assertTrue(hasLease(cluster, b));
 
       a_out.close();
       b_out.close();
 
-      assertTrue(!hasLease(cluster, a));
-      assertTrue(!hasLease(cluster, b));
+      Assert.assertTrue(!hasLease(cluster, a));
+      Assert.assertTrue(!hasLease(cluster, b));
       
       fs.delete(dir, true);
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
+
+  @Test
+  public void testFactory() throws Exception {
+    final String[] groups = new String[]{"supergroup"};
+    final UserGroupInformation[] ugi = new UserGroupInformation[3];
+    for(int i = 0; i < ugi.length; i++) {
+      ugi[i] = UserGroupInformation.createUserForTesting("user" + i, groups);
+    }
+
+    final Configuration conf = new Configuration();
+    final DFSClient c1 = createDFSClientAs(ugi[0], conf);
+    FSDataOutputStream out1 = createFsOut(c1, "/out1");
+    final DFSClient c2 = createDFSClientAs(ugi[0], conf);
+    FSDataOutputStream out2 = createFsOut(c2, "/out2");
+    Assert.assertEquals(c1.getLeaseRenewer(), c2.getLeaseRenewer());
+    final DFSClient c3 = createDFSClientAs(ugi[1], conf);
+    FSDataOutputStream out3 = createFsOut(c3, "/out3");
+    Assert.assertTrue(c1.getLeaseRenewer() != c3.getLeaseRenewer());
+    final DFSClient c4 = createDFSClientAs(ugi[1], conf);
+    FSDataOutputStream out4 = createFsOut(c4, "/out4");
+    Assert.assertEquals(c3.getLeaseRenewer(), c4.getLeaseRenewer());
+    final DFSClient c5 = createDFSClientAs(ugi[2], conf);
+    FSDataOutputStream out5 = createFsOut(c5, "/out5");
+    Assert.assertTrue(c1.getLeaseRenewer() != c5.getLeaseRenewer());
+    Assert.assertTrue(c3.getLeaseRenewer() != c5.getLeaseRenewer());
+  }
+
+  private FSDataOutputStream createFsOut(DFSClient dfs, String path)
+      throws IOException {
+    return new FSDataOutputStream(dfs.create(path, true), null);
+  }
+  
+  static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class);
+  static public DFSClient createDFSClientAs(UserGroupInformation ugi, 
+      final Configuration conf) throws Exception {
+    return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
+      @Override
+      public DFSClient run() throws Exception {
+        return new DFSClient(null, mcp, conf, null);
+      }
+    });
+  }
 }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1407625&r1=1407624&r2=1407625&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
(original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
Fri Nov  9 20:51:09 2012
@@ -157,7 +157,7 @@ public class TestLeaseRecovery2 extends 
     stm.sync();
     if (triggerSoftLease) {
       AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
-      dfs.dfs.leasechecker.interruptAndJoin();
+      dfs.dfs.getLeaseRenewer().interruptAndJoin();
     }
     return filepath;
   }



Mime
View raw message