hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r943295 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
Date Tue, 11 May 2010 21:11:54 GMT
Author: dhruba
Date: Tue May 11 21:11:53 2010
New Revision: 943295

URL: http://svn.apache.org/viewvc?rev=943295&view=rev
Log:
HDFS-1141. Closing a file is successful only if the client still has a
valid lease. (Todd Lipcon via dhruba)


Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=943295&r1=943294&r2=943295&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue May 11 21:11:53 2010
@@ -875,6 +875,9 @@ Release 0.21.0 - Unreleased
 
     HDFS-1104. Fsck triggers full GC on NameNode. (hairong)
 
+    HDFS-1141. Closing a file is successful only if the client still has a
+    valid lease. (Todd Lipcon via dhruba)
+
 Release 0.20.3 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=943295&r1=943294&r2=943295&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue
May 11 21:11:53 2010
@@ -1528,62 +1528,39 @@ public class FSNamesystem implements FSC
           + pendingFile.getClientName() + " but is accessed by " + holder);
     }
   }
-
+ 
   /**
-   * The FSNamesystem will already know the blocks that make up the file.
-   * Before we return, we make sure that all the file's blocks have 
-   * been reported by datanodes and are replicated correctly.
+   * Complete in-progress write to the given file.
+   * @return true if successful, false if the client should continue to retry
+   *         (e.g if not all blocks have reached minimum replication yet)
+   * @throws IOException on error (eg lease mismatch, file not open, file deleted)
    */
-  
-  enum CompleteFileStatus {
-    OPERATION_FAILED,
-    STILL_WAITING,
-    COMPLETE_SUCCESS
-  }
-  
-  public CompleteFileStatus completeFile(String src, String holder, Block last) 
+  public boolean completeFile(String src, String holder, Block last) 
     throws IOException, UnresolvedLinkException {
-    CompleteFileStatus status = completeFileInternal(src, holder, last);
+    boolean success = completeFileInternal(src, holder, last);
     getEditLog().logSync();
-    return status;
+    return success ;
   }
 
-  private synchronized CompleteFileStatus completeFileInternal(String src, 
+  private synchronized boolean completeFileInternal(String src, 
     String holder, Block last) throws IOException, UnresolvedLinkException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
     if (isInSafeMode())
       throw new SafeModeException("Cannot complete file " + src, safeMode);
-    INode iFile = dir.getFileINode(src);
-    INodeFileUnderConstruction pendingFile = null;
-    Block[] fileBlocks = null;
-
-    if (iFile != null && iFile.isUnderConstruction()) {
-      pendingFile = (INodeFileUnderConstruction) iFile;
-      fileBlocks =  dir.getFileBlocks(src);
-    }
-    if (fileBlocks == null ) {    
-      NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
-                                   + "failed to complete " + src
-                                   + " because dir.getFileBlocks() is null " + 
-                                   " and pendingFile is " + 
-                                   ((pendingFile == null) ? "null" : 
-                                     ("from " + pendingFile.getClientMachine()))
-                                  );                      
-      return CompleteFileStatus.OPERATION_FAILED;
-    } 
 
+    INodeFileUnderConstruction pendingFile = checkLease(src, holder);
     // commit the last block and complete it if it has minimum replicas
     blockManager.commitOrCompleteLastBlock(pendingFile, last);
 
     if (!checkFileProgress(pendingFile, true)) {
-      return CompleteFileStatus.STILL_WAITING;
+      return false;
     }
 
     finalizeINodeFileUnderConstruction(src, pendingFile);
 
     NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
                                   + " is closed by " + holder);
-    return CompleteFileStatus.COMPLETE_SUCCESS;
+    return true;
   }
 
   /** 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=943295&r1=943294&r2=943295&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue May
11 21:11:53 2010
@@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.CompleteFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -708,16 +707,7 @@ public class NameNode implements Namenod
   public boolean complete(String src, String clientName, Block last)
       throws IOException {
     stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
-    CompleteFileStatus returnCode =
-      namesystem.completeFile(src, clientName, last);
-    if (returnCode == CompleteFileStatus.STILL_WAITING) {
-      return false;
-    } else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
-      return true;
-    } else {
-      throw new IOException("Could not complete write to file " + 
-                            src + " by " + clientName);
-    }
+    return namesystem.completeFile(src, clientName, last);
   }
 
   /**

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=943295&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java Tue May 11
21:11:53 2010
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/* File Append tests for HDFS-200 & HDFS-142, specifically focused on:
+ *  using append()/sync() to recover block information
+ */
+public class TestFileAppend4 {
+  static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
+  static final long BLOCK_SIZE = 1024;
+  static final long BBW_SIZE = 500; // don't align on bytes/checksum
+
+  static final Object [] NO_ARGS = new Object []{};
+
+  Configuration conf;
+  MiniDFSCluster cluster;
+  Path file1;
+  FSDataOutputStream stm;
+  boolean simulatedStorage = false;
+
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
+
+    // lower heartbeat interval for fast recognition of DN death
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
+    // handle under-replicated blocks quickly (for replication asserts)
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    
+    // handle failures in the DFSClient pipeline quickly
+    // (for cluster.shutdown(); fs.close() idiom)
+    conf.setInt("ipc.client.connect.max.retries", 1);
+  }
+  
+  /*
+   * Recover file.
+   * Try and open file in append mode.
+   * Doing this, we get a hold of the file that crashed writer
+   * was writing to.  Once we have it, close it.  This will
+   * allow subsequent reader to see up to last sync.
+   * NOTE: This is the same algorithm that HBase uses for file recovery
+   * @param fs
+   * @throws Exception
+   */
+  private void recoverFile(final FileSystem fs) throws Exception {
+    LOG.info("Recovering File Lease");
+
+    // set the soft limit to be 1 second so that the
+    // namenode triggers lease recovery upon append request
+    cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
+
+    // Trying recovery
+    int tries = 60;
+    boolean recovered = false;
+    FSDataOutputStream out = null;
+    while (!recovered && tries-- > 0) {
+      try {
+        out = fs.append(file1);
+        LOG.info("Successfully opened for appends");
+        recovered = true;
+      } catch (IOException e) {
+        LOG.info("Failed open for append, waiting on lease recovery");
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          // ignore it and try again
+        }
+      }
+    }
+    if (out != null) {
+      out.close();
+    }
+    if (!recovered) {
+      fail("Recovery should take < 1 min");
+    }
+    LOG.info("Past out lease recovery");
+  }
+  
+  /**
+   * Test case that stops a writer after finalizing a block but
+   * before calling completeFile, and then tries to recover
+   * the lease from another thread.
+   */
+  @Test(timeout=60000)
+  public void testRecoverFinalizedBlock() throws Throwable {
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+ 
+    try {
+      cluster.waitActive();
+      NameNode preSpyNN = cluster.getNameNode();
+      NameNode spyNN = spy(preSpyNN);
+ 
+      // Delay completeFile
+      DelayAnswer delayer = new DelayAnswer();
+      doAnswer(delayer).when(spyNN).complete(
+          anyString(), anyString(), (Block)anyObject());
+ 
+      DFSClient client = new DFSClient(null, spyNN, conf, null);
+      file1 = new Path("/testRecoverFinalized");
+      final OutputStream stm = client.create("/testRecoverFinalized", true);
+ 
+      // write 1/2 block
+      AppendTestUtil.write(stm, 0, 4096);
+      final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+      Thread t = new Thread() { 
+          public void run() {
+            try {
+              stm.close();
+            } catch (Throwable t) {
+              err.set(t);
+            }
+          }};
+      t.start();
+      LOG.info("Waiting for close to get to latch...");
+      delayer.waitForCall();
+ 
+      // At this point, the block is finalized on the DNs, but the file
+      // has not been completed in the NN.
+      // Lose the leases
+      LOG.info("Killing lease checker");
+      client.leasechecker.interruptAndJoin();
+ 
+      FileSystem fs1 = cluster.getFileSystem();
+      FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
+        fs1.getConf());
+ 
+      LOG.info("Recovering file");
+      recoverFile(fs2);
+ 
+      LOG.info("Telling close to proceed.");
+      delayer.proceed();
+      LOG.info("Waiting for close to finish.");
+      t.join();
+      LOG.info("Close finished.");
+ 
+      // We expect that close will get a "File is not open"
+      // error.
+      Throwable thrownByClose = err.get();
+      assertNotNull(thrownByClose);
+      assertTrue(thrownByClose instanceof IOException);
+      if (!thrownByClose.getMessage().contains(
+            "No lease on /testRecoverFinalized"))
+        throw thrownByClose;
+    } finally {
+      cluster.shutdown();
+    }
+  }
+ 
+  /**
+   * Test case that stops a writer after finalizing a block but
+   * before calling completeFile, recovers a file from another writer,
+   * starts writing from that writer, and then has the old lease holder
+   * call completeFile
+   */
+  @Test(timeout=60000)
+  public void testCompleteOtherLeaseHoldersFile() throws Throwable {
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+ 
+    try {
+      cluster.waitActive();
+      NameNode preSpyNN = cluster.getNameNode();
+      NameNode spyNN = spy(preSpyNN);
+ 
+      // Delay completeFile
+      DelayAnswer delayer = new DelayAnswer();
+      doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), (Block)anyObject());
+ 
+      DFSClient client = new DFSClient(null, spyNN, conf, null);
+      file1 = new Path("/testCompleteOtherLease");
+      final OutputStream stm = client.create("/testCompleteOtherLease", true);
+ 
+      // write 1/2 block
+      AppendTestUtil.write(stm, 0, 4096);
+      final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+      Thread t = new Thread() { 
+          public void run() {
+            try {
+              stm.close();
+            } catch (Throwable t) {
+              err.set(t);
+            }
+          }};
+      t.start();
+      LOG.info("Waiting for close to get to latch...");
+      delayer.waitForCall();
+ 
+      // At this point, the block is finalized on the DNs, but the file
+      // has not been completed in the NN.
+      // Lose the leases
+      LOG.info("Killing lease checker");
+      client.leasechecker.interruptAndJoin();
+ 
+      FileSystem fs1 = cluster.getFileSystem();
+      FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
+        fs1.getConf());
+ 
+      LOG.info("Recovering file");
+      recoverFile(fs2);
+ 
+      LOG.info("Opening file for append from new fs");
+      FSDataOutputStream appenderStream = fs2.append(file1);
+      
+      LOG.info("Writing some data from new appender");
+      AppendTestUtil.write(appenderStream, 0, 4096);
+      
+      LOG.info("Telling old close to proceed.");
+      delayer.proceed();
+      LOG.info("Waiting for close to finish.");
+      t.join();
+      LOG.info("Close finished.");
+ 
+      // We expect that close will get a "Lease mismatch"
+      // error.
+      Throwable thrownByClose = err.get();
+      assertNotNull(thrownByClose);
+      assertTrue(thrownByClose instanceof IOException);
+      if (!thrownByClose.getMessage().contains(
+            "Lease mismatch"))
+        throw thrownByClose;
+      
+      // The appender should be able to close properly
+      appenderStream.close();
+    } finally {
+      cluster.shutdown();
+    }
+  }  
+ 
+  /**
+   * Mockito answer helper that triggers one latch as soon as the
+   * method is called, then waits on another before continuing.
+   */
+  @SuppressWarnings("unchecked")
+  private static class DelayAnswer implements Answer {
+    private final CountDownLatch fireLatch = new CountDownLatch(1);
+    private final CountDownLatch waitLatch = new CountDownLatch(1);
+ 
+    /**
+     * Wait until the method is called.
+     */
+    public void waitForCall() throws InterruptedException {
+      fireLatch.await();
+    }
+ 
+    /**
+     * Tell the method to proceed.
+     * This should only be called after waitForCall()
+     */
+    public void proceed() {
+      waitLatch.countDown();
+    }
+ 
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      LOG.info("DelayAnswer firing fireLatch");
+      fireLatch.countDown();
+      try {
+        LOG.info("DelayAnswer waiting on waitLatch");
+        waitLatch.await();
+        LOG.info("DelayAnswer delay complete");
+      } catch (InterruptedException ie) {
+        throw new IOException("Interrupted waiting on latch", ie);
+      }
+      return invocation.callRealMethod();
+    }
+  }
+}



Mime
View raw message