hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1133108 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Tue, 07 Jun 2011 17:41:52 GMT
Author: stack
Date: Tue Jun  7 17:41:51 2011
New Revision: 1133108

URL: http://svn.apache.org/viewvc?rev=1133108&view=rev
Log:
HDFS-1948 Forward port 'hdfs-1520 lightweight namenode operation to trigger lease reccovery'

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1133108&r1=1133107&r2=1133108&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Jun  7 17:41:51 2011
@@ -944,6 +944,9 @@ Release 0.22.0 - Unreleased
 
     HDFS-1619. Remove AC_TYPE* from the libhdfs. (Roman Shaposhnik via eli)
 
+    HDFS-1948  Forward port 'hdfs-1520 lightweight namenode operation to
+    trigger lease recovery' (stack)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1133108&r1=1133107&r2=1133108&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jun  7 17:41:51 2011
@@ -539,6 +539,23 @@ public class DFSClient implements FSCons
   }
 
   /**
+   * Recover a file's lease
+   * @param src a file's path
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  boolean recoverLease(String src) throws IOException {
+    checkOpen();
+
+    try {
+      return namenode.recoverLease(src, clientName);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(FileNotFoundException.class,
+                                     AccessControlException.class);
+    }
+  }
+
+  /**
    * Get block location info about file
    * 
    * getBlockLocations() returns a list of hostnames that store 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1133108&r1=1133107&r2=1133108&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Jun 
7 17:41:51 2011
@@ -218,6 +218,17 @@ public class DistributedFileSystem exten
     this.verifyChecksum = verifyChecksum;
   }
 
+  /** 
+   * Start the lease recovery of a file
+   *
+   * @param f a file
+   * @return true if the file is already closed
+   * @throws IOException if an error occurs
+   */
+  public boolean recoverLease(Path f) throws IOException {
+    return dfs.recoverLease(getPathName(f));
+  }
+
   @SuppressWarnings("deprecation")
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1133108&r1=1133107&r2=1133108&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Jun
 7 17:41:51 2011
@@ -537,6 +537,17 @@ public interface ClientProtocol extends 
   public void renewLease(String clientName) throws AccessControlException,
       IOException;
 
+  /**
+   * Start lease recovery.
+   * Lightweight NameNode operation to trigger lease recovery
+   * 
+   * @param src path of the file to start lease recovery
+   * @param clientName name of the current client
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  public boolean recoverLease(String src, String clientName) throws IOException;
+
   public int GET_STATS_CAPACITY_IDX = 0;
   public int GET_STATS_USED_IDX = 1;
   public int GET_STATS_REMAINING_IDX = 2;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1133108&r1=1133107&r2=1133108&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue
Jun  7 17:41:51 2011
@@ -1395,62 +1395,7 @@ public class FSNamesystem implements FSC
 
     try {
       INode myFile = dir.getFileINode(src);
-      if (myFile != null && myFile.isUnderConstruction()) {
-        INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
-        //
-        // If the file is under construction , then it must be in our
-        // leases. Find the appropriate lease record.
-        //
-        Lease lease = leaseManager.getLeaseByPath(src);
-        if (lease == null) {
-          throw new AlreadyBeingCreatedException(
-              "failed to create file " + src + " for " + holder +
-              " on client " + clientMachine + 
-              " because pendingCreates is non-null but no leases found.");
-        }
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease.getHolder().equals(holder)) {
-          throw new AlreadyBeingCreatedException(
-              "failed to create file " + src + " for " + holder +
-              " on client " + clientMachine + 
-              " because current leaseholder is trying to recreate file.");
-        }
-        assert lease.getHolder().equals(pendingFile.getClientName()) :
-          "Current lease holder " + lease.getHolder() +
-          " does not match file creator " + pendingFile.getClientName();
-        //
-        // Current lease holder is different from the requester.
-        // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then start lease recovery, otherwise fail.
-        //
-        if (lease.expiredSoftLimit()) {
-          LOG.info("startFile: recover lease " + lease + ", src=" + src);
-          boolean isClosed = internalReleaseLease(lease, src, null);
-          if(!isClosed)
-            throw new RecoveryInProgressException(
-                "Failed to close file " + src +
-                ". Lease recovery is in progress. Try again later.");
-
-        } else {
-          BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
-          if(lastBlock != null && lastBlock.getBlockUCState() ==
-            BlockUCState.UNDER_RECOVERY) {
-            throw new RecoveryInProgressException(
-              "Recovery in progress, file [" + src + "], " +
-              "lease owner [" + lease.getHolder() + "]");
-            } else {
-              throw new AlreadyBeingCreatedException(
-                "Failed to create file [" + src + "] for [" + holder +
-                "] on client [" + clientMachine +
-                "], because this file is already being created by [" +
-                pendingFile.getClientName() + "] on [" +
-                pendingFile.getClientMachine() + "]");
-            }
-         }
-      }
+      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
 
       try {
         blockManager.verifyReplication(src, replication, clientMachine);
@@ -1537,6 +1482,120 @@ public class FSNamesystem implements FSC
   }
 
   /**
+   * Recover lease;
+   * Immediately revoke the lease of the current lease holder and start lease
+   * recovery so that the file can be forced to be closed.
+   * 
+   * @param src the path of the file to start lease recovery
+   * @param holder the lease holder's name
+   * @param clientMachine the client machine's name
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  synchronized boolean recoverLease(String src, String holder, String clientMachine)
+  throws IOException {
+    if (isInSafeMode()) {
+      throw new SafeModeException(
+          "Cannot recover the lease of " + src, safeMode);
+    }
+    if (!DFSUtil.isValidName(src)) {
+      throw new IOException("Invalid file name: " + src);
+    }
+
+    INode inode = dir.getFileINode(src);
+    if (inode == null) {
+      throw new FileNotFoundException("File not found " + src);
+    }
+
+    if (!inode.isUnderConstruction()) {
+      return true;
+    }
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+
+    recoverLeaseInternal(inode, src, holder, clientMachine, true);
+    return false;
+  }
+
+  private void recoverLeaseInternal(INode fileInode, 
+      String src, String holder, String clientMachine, boolean force)
+  throws IOException {
+    if (fileInode != null && fileInode.isUnderConstruction()) {
+      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
+      //
+      // If the file is under construction , then it must be in our
+      // leases. Find the appropriate lease record.
+      //
+      Lease lease = leaseManager.getLease(holder);
+      //
+      // We found the lease for this file. And surprisingly the original
+      // holder is trying to recreate this file. This should never occur.
+      //
+      if (!force && lease != null) {
+        Lease leaseFile = leaseManager.getLeaseByPath(src);
+        if ((leaseFile != null && leaseFile.equals(lease)) ||
+            lease.getHolder().equals(holder)) { 
+          throw new AlreadyBeingCreatedException(
+            "failed to create file " + src + " for " + holder +
+            " on client " + clientMachine + 
+            " because current leaseholder is trying to recreate file.");
+        }
+      }
+      //
+      // Find the original holder.
+      //
+      lease = leaseManager.getLease(pendingFile.getClientName());
+      if (lease == null) {
+        throw new AlreadyBeingCreatedException(
+          "failed to create file " + src + " for " + holder +
+          " on client " + clientMachine + 
+          " because pendingCreates is non-null but no leases found.");
+      }
+      if (force) {
+        // close now: no need to wait for soft lease expiration and 
+        // close only the file src
+        LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
+          " from client " + pendingFile.getClientName());
+        internalReleaseLease(lease, src, holder);
+      } else {
+        assert lease.getHolder().equals(pendingFile.getClientName()) :
+          "Current lease holder " + lease.getHolder() +
+          " does not match file creator " + pendingFile.getClientName();
+        //
+        // If the original holder has not renewed in the last SOFTLIMIT 
+        // period, then start lease recovery.
+        //
+        if (lease.expiredSoftLimit()) {
+          LOG.info("startFile: recover lease " + lease + ", src=" + src +
+              " from client " + pendingFile.getClientName());
+          boolean isClosed = internalReleaseLease(lease, src, null);
+          if(!isClosed)
+            throw new RecoveryInProgressException(
+                "Failed to close file " + src +
+                ". Lease recovery is in progress. Try again later.");
+        } else {
+          BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
+          if(lastBlock != null && lastBlock.getBlockUCState() ==
+            BlockUCState.UNDER_RECOVERY) {
+            throw new RecoveryInProgressException(
+              "Recovery in progress, file [" + src + "], " +
+              "lease owner [" + lease.getHolder() + "]");
+            } else {
+              throw new AlreadyBeingCreatedException(
+                "Failed to create file [" + src + "] for [" + holder +
+                "] on client [" + clientMachine +
+                "], because this file is already being created by [" +
+                pendingFile.getClientName() + "] on [" +
+                pendingFile.getClientMachine() + "]");
+            }
+         }
+      }
+    }
+
+  }
+
+  /**
    * Append to an existing file in the namespace.
    */
   LocatedBlock appendFile(String src, String holder, String clientMachine)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1133108&r1=1133107&r2=1133108&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Jun
 7 17:41:51 2011
@@ -836,6 +836,12 @@ public class NameNode implements Namenod
   }
 
   /** {@inheritDoc} */
+  public boolean recoverLease(String src, String clientName) throws IOException {
+    String clientMachine = getClientMachine();
+    return namesystem.recoverLease(src, clientName, clientMachine);
+  }
+
+  /** {@inheritDoc} */
   public boolean setReplication(String src, short replication) 
     throws IOException {  
     return namesystem.setReplication(src, replication);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1133108&r1=1133107&r2=1133108&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Tue Jun
 7 17:41:51 2011
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -27,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
@@ -95,7 +98,147 @@ public class TestLeaseRecovery2 {
     IOUtils.closeStream(dfs);
     if (cluster != null) {cluster.shutdown();}
   }
-  
+
+  /**
+   * Test the NameNode's revoke lease on current lease holder function.
+   * @throws Exception
+   */
+  @Test
+  public void testImmediateRecoveryOfLease() throws Exception {
+    //create a file
+    // write bytes into the file.
+    byte [] actual = new byte[FILE_SIZE];
+    int size = AppendTestUtil.nextInt(FILE_SIZE);
+    Path filepath = createFile("/immediateRecoverLease-shortlease", size, true);
+    // set the soft limit to be 1 second so that the
+    // namenode triggers lease recovery on next attempt to write-for-open.
+    cluster.setLeasePeriod(SHORT_LEASE_PERIOD, LONG_LEASE_PERIOD);
+
+    recoverLeaseUsingCreate(filepath);
+    verifyFile(dfs, filepath, actual, size);
+
+    //test recoverLease
+    // set the soft limit to be 1 hour but recoverLease should
+    // close the file immediately
+    cluster.setLeasePeriod(LONG_LEASE_PERIOD, LONG_LEASE_PERIOD);
+    size = AppendTestUtil.nextInt(FILE_SIZE);
+    filepath = createFile("/immediateRecoverLease-longlease", size, false);
+
+    // test recoverLese from a different client
+    recoverLease(filepath, null);
+    verifyFile(dfs, filepath, actual, size);
+
+    // test recoverlease from the same client
+    size = AppendTestUtil.nextInt(FILE_SIZE);
+    filepath = createFile("/immediateRecoverLease-sameclient", size, false);
+
+    // create another file using the same client
+    Path filepath1 = new Path(filepath.toString() + AppendTestUtil.nextInt());
+    FSDataOutputStream stm = dfs.create(filepath1, true, BUF_SIZE,
+      REPLICATION_NUM, BLOCK_SIZE);
+
+    // recover the first file
+    recoverLease(filepath, dfs);
+    verifyFile(dfs, filepath, actual, size);
+
+    // continue to write to the second file
+    stm.write(buffer, 0, size);
+    stm.close();
+    verifyFile(dfs, filepath1, actual, size);
+  }
+
+  private Path createFile(final String filestr, final int size,
+      final boolean triggerLeaseRenewerInterrupt)
+  throws IOException, InterruptedException {
+    AppendTestUtil.LOG.info("filestr=" + filestr);
+    Path filepath = new Path(filestr);
+    FSDataOutputStream stm = dfs.create(filepath, true, BUF_SIZE,
+      REPLICATION_NUM, BLOCK_SIZE);
+    assertTrue(dfs.dfs.exists(filestr));
+
+    AppendTestUtil.LOG.info("size=" + size);
+    stm.write(buffer, 0, size);
+
+    // hflush file
+    AppendTestUtil.LOG.info("hflush");
+    stm.hflush();
+
+    if (triggerLeaseRenewerInterrupt) {
+      AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+      dfs.dfs.leaserenewer.interruptAndJoin();
+    }
+    return filepath;
+  }
+
+  private void recoverLease(Path filepath, DistributedFileSystem dfs)
+  throws Exception {
+    if (dfs == null) {
+      dfs = (DistributedFileSystem)getFSAsAnotherUser(conf);
+    }
+
+    while (!dfs.recoverLease(filepath)) {
+      AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+      Thread.sleep(5000);
+    }
+  }
+
+  private FileSystem getFSAsAnotherUser(final Configuration c)
+  throws IOException, InterruptedException {
+    return FileSystem.get(FileSystem.getDefaultUri(c), c,
+      UserGroupInformation.createUserForTesting(fakeUsername, 
+          new String [] {fakeGroup}).getUserName());
+  }
+
+  private void recoverLeaseUsingCreate(Path filepath)
+  throws IOException, InterruptedException {
+    FileSystem dfs2 = getFSAsAnotherUser(conf);
+
+    boolean done = false;
+    for(int i = 0; i < 10 && !done; i++) {
+      AppendTestUtil.LOG.info("i=" + i);
+      try {
+        dfs2.create(filepath, false, BUF_SIZE, (short)1, BLOCK_SIZE);
+        fail("Creation of an existing file should never succeed.");
+      } catch (IOException ioe) {
+        final String message = ioe.getMessage();
+        if (message.contains("file exists")) {
+          AppendTestUtil.LOG.info("done", ioe);
+          done = true;
+        }
+        else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
+          AppendTestUtil.LOG.info("GOOD! got " + message);
+        }
+        else {
+          AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
+        }
+      }
+
+      if (!done) {
+        AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+        try {Thread.sleep(5000);} catch (InterruptedException e) {}
+      }
+    }
+    assertTrue(done);
+  }
+
+  private void verifyFile(FileSystem dfs, Path filepath, byte[] actual,
+      int size) throws IOException {
+    AppendTestUtil.LOG.info("Lease for file " +  filepath + " is recovered. "
+        + "Validating its contents now...");
+
+    // verify that file-size matches
+    assertTrue("File should be " + size + " bytes, but is actually " +
+               " found to be " + dfs.getFileStatus(filepath).getLen() +
+               " bytes",
+               dfs.getFileStatus(filepath).getLen() == size);
+
+    // verify that there is enough data to read.
+    System.out.println("File size is good. Now validating sizes from datanodes...");
+    FSDataInputStream stmin = dfs.open(filepath);
+    stmin.readFully(0, actual, 0, size);
+    stmin.close();
+  }
+
   /**
    * This test makes the client does not renew its lease and also
    * set the hard lease expiration period to be short 1s. Thus triggering



Mime
View raw message