hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brando...@apache.org
Subject git commit: HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response. Contributed by Brandon Li
Date Tue, 21 Oct 2014 18:03:25 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 3820bf055 -> 33e020cb1


HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response. Contributed by Brandon
Li


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/33e020cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/33e020cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/33e020cb

Branch: refs/heads/branch-2
Commit: 33e020cb191dc7264b7f683cff7d20980b6919c8
Parents: 3820bf0
Author: Brandon Li <brandonli@apache.org>
Authored: Tue Oct 21 10:20:29 2014 -0700
Committer: Brandon Li <brandonli@apache.org>
Committed: Tue Oct 21 11:01:27 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/nfs/nfs3/IdUserGroup.java |   4 +-
 .../hadoop/hdfs/nfs/conf/NfsConfigKeys.java     |   3 +
 .../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java       |  81 ++++++++--
 .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java    |   2 +-
 .../apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java   |   2 +-
 .../hadoop/hdfs/nfs/nfs3/WriteManager.java      |  16 +-
 .../apache/hadoop/hdfs/nfs/nfs3/TestWrites.java | 153 ++++++++++++++++++-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 8 files changed, 237 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/33e020cb/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java
b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java
index b037413..c174533 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java
@@ -372,7 +372,7 @@ public class IdUserGroup {
       uid = getUid(user);
     } catch (IOException e) {
       uid = user.hashCode();
-      LOG.info("Can't map user " + user + ". Use its string hashcode:" + uid, e);
+      LOG.info("Can't map user " + user + ". Use its string hashcode:" + uid);
     }
     return uid;
   }
@@ -385,7 +385,7 @@ public class IdUserGroup {
       gid = getGid(group);
     } catch (IOException e) {
       gid = group.hashCode();
-      LOG.info("Can't map group " + group + ". Use its string hashcode:" + gid, e);
+      LOG.info("Can't map group " + group + ". Use its string hashcode:" + gid);
     }
     return gid;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33e020cb/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java
index 2f65ce4..178d855 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java
@@ -57,4 +57,7 @@ public class NfsConfigKeys {
 
   public static final String  AIX_COMPAT_MODE_KEY = "nfs.aix.compatibility.mode.enabled";
   public static final boolean AIX_COMPAT_MODE_DEFAULT = false;
+  
+  public final static String LARGE_FILE_UPLOAD = "nfs.large.file.upload";
+  public final static boolean LARGE_FILE_UPLOAD_DEFAULT = true;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33e020cb/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
index dc2f1b3..5e58187 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx.DataState;
 import org.apache.hadoop.io.BytesWritable.Comparator;
 import org.apache.hadoop.io.IOUtils;
@@ -77,7 +78,26 @@ class OpenFileCtx {
     COMMIT_INACTIVE_CTX,
     COMMIT_INACTIVE_WITH_PENDING_WRITE,
     COMMIT_ERROR,
-    COMMIT_DO_SYNC;
+    COMMIT_DO_SYNC,
+    /**
+     * Deferred COMMIT response could fail file uploading. The following two
+     * status are introduced as a solution. 1. if client asks to commit
+     * non-sequential trunk of data, NFS gateway return success with the hope
+     * that client will send the prerequisite writes. 2. if client asks to
+     * commit a sequential trunk(means it can be flushed to HDFS), NFS gateway
+     * return a special error NFS3ERR_JUKEBOX indicating the client needs to
+     * retry. Meanwhile, NFS gateway keeps flush data to HDFS and do sync
+     * eventually.
+     * 
+     * The reason to let client wait is that, we want the client to wait for the
+     * last commit. Otherwise, client thinks file upload finished (e.g., cp
+     * command returns success) but NFS could be still flushing staged data to
+     * HDFS. However, we don't know which one is the last commit. We make the
+     * assumption that a commit after sequential writes may be the last.
+     * Referring HDFS-7259 for more details.
+     * */
+    COMMIT_SPECIAL_WAIT, // scoped pending writes is sequential
+    COMMIT_SPECIAL_SUCCESS;// scoped pending writes is not sequential 
   }
 
   private final DFSClient client;
@@ -159,6 +179,7 @@ class OpenFileCtx {
   private RandomAccessFile raf;
   private final String dumpFilePath;
   private Daemon dumpThread;
+  private final boolean uploadLargeFile;
   
   private void updateLastAccessTime() {
     lastAccessTime = Time.monotonicNow();
@@ -200,12 +221,13 @@ class OpenFileCtx {
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
       String dumpFilePath, DFSClient client, IdUserGroup iug) {
-    this(fos, latestAttr, dumpFilePath, client, iug, false);
+    this(fos, latestAttr, dumpFilePath, client, iug, false,
+        new NfsConfiguration());
   }
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
       String dumpFilePath, DFSClient client, IdUserGroup iug,
-      boolean aixCompatMode) {
+      boolean aixCompatMode, NfsConfiguration config) {
     this.fos = fos;
     this.latestAttr = latestAttr;
     this.aixCompatMode = aixCompatMode;
@@ -235,6 +257,8 @@ class OpenFileCtx {
     dumpThread = null;
     this.client = client;
     this.iug = iug;
+    this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD,
+        NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT);
   }
 
   public Nfs3FileAttributes getLatestAttr() {
@@ -783,6 +807,11 @@ class OpenFileCtx {
         return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
       }
     }
+    if (pendingWrites.isEmpty()) {
+      // Note that, there is no guarantee data is synced. Caller should still
+      // do a sync here though the output stream might be closed.
+      return COMMIT_STATUS.COMMIT_FINISHED;
+    }
 
     long flushed = 0;
     try {
@@ -795,6 +824,32 @@ class OpenFileCtx {
       LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
     }
 
+    // Handle large file upload
+    if (uploadLargeFile && !aixCompatMode) {
+      long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
+          .getKey().getMax() - 1;
+
+      if (co <= flushed) {
+        return COMMIT_STATUS.COMMIT_DO_SYNC;
+      } else if (co < nextOffset.get()) {
+        if (!fromRead) {
+          // let client retry the same request, add pending commit to sync later
+          CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+              preOpAttr);
+          pendingCommits.put(commitOffset, commitCtx);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("return COMMIT_SPECIAL_WAIT");
+        }
+        return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("return COMMIT_SPECIAL_SUCCESS");
+        }
+        return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
+      }
+    }
+    
     if (commitOffset > 0) {
       if (aixCompatMode) {
         // The AIX NFS client misinterprets RFC-1813 and will always send 4096
@@ -825,20 +880,14 @@ class OpenFileCtx {
     Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
 
     // Commit whole file, commitOffset == 0
-    if (pendingWrites.isEmpty()) {
-      // Note that, there is no guarantee data is synced. TODO: We could still
-      // do a sync here though the output stream might be closed.
-      return COMMIT_STATUS.COMMIT_FINISHED;
-    } else {
-      if (!fromRead) {
-        // Insert commit
-        long maxOffset = key.getKey().getMax() - 1;
-        Preconditions.checkState(maxOffset > 0);
-        CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
-        pendingCommits.put(maxOffset, commitCtx);
-      }
-      return COMMIT_STATUS.COMMIT_WAIT;
+    if (!fromRead) {
+      // Insert commit
+      long maxOffset = key.getKey().getMax() - 1;
+      Preconditions.checkState(maxOffset > 0);
+      CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
+      pendingCommits.put(maxOffset, commitCtx);
     }
+    return COMMIT_STATUS.COMMIT_WAIT;
   }
   
   private void addWrite(WriteCtx writeCtx) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33e020cb/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
index 6012b9b..40a9043 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
@@ -944,7 +944,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface
{
       // Add open stream
       OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
           writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
-          aixCompatMode);
+          aixCompatMode, config);
       fileHandle = new FileHandle(postOpObjAttr.getFileId());
       if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
         LOG.warn("Can't add more stream, close it."

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33e020cb/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
index 05e0fb7..3b5885e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
@@ -41,7 +41,7 @@ class WriteCtx {
   
   /**
    * In memory write data has 3 states. ALLOW_DUMP: not sequential write, still
-   * wait for prerequisit writes. NO_DUMP: sequential write, no need to dump
+   * wait for prerequisite writes. NO_DUMP: sequential write, no need to dump
    * since it will be written to HDFS soon. DUMPED: already dumped to a file.
    */
   public static enum DataState {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33e020cb/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
index 7bddc44..9bded49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
@@ -178,7 +178,7 @@ public class WriteManager {
       String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
           NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
       openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
-          + fileHandle.getFileId(), dfsClient, iug, aixCompatMode);
+          + fileHandle.getFileId(), dfsClient, iug, aixCompatMode, config);
 
       if (!addOpenFileStream(fileHandle, openFileCtx)) {
         LOG.info("Can't add new stream. Close it. Tell client to retry.");
@@ -236,6 +236,7 @@ public class WriteManager {
         status = Nfs3Status.NFS3ERR_IO;
         break;
       case COMMIT_WAIT:
+      case COMMIT_SPECIAL_WAIT:
         /**
          * This should happen rarely in some possible cases, such as read
          * request arrives before DFSClient is able to quickly flush data to DN,
@@ -244,6 +245,10 @@ public class WriteManager {
          */     
         status = Nfs3Status.NFS3ERR_JUKEBOX;
         break;
+      case COMMIT_SPECIAL_SUCCESS:
+        // Read beyond eof could result in partial read
+        status = Nfs3Status.NFS3_OK;
+        break;
       default:
         LOG.error("Should not get commit return code:" + ret.name());
         throw new RuntimeException("Should not get commit return code:"
@@ -278,6 +283,12 @@ public class WriteManager {
       case COMMIT_WAIT:
         // Do nothing. Commit is async now.
         return;
+      case COMMIT_SPECIAL_WAIT:
+        status = Nfs3Status.NFS3ERR_JUKEBOX;
+        break;
+      case COMMIT_SPECIAL_SUCCESS:
+        status = Nfs3Status.NFS3_OK;
+        break;
       default:
         LOG.error("Should not get commit return code:" + ret.name());
         throw new RuntimeException("Should not get commit return code:"
@@ -288,8 +299,7 @@ public class WriteManager {
     // Send out the response
     Nfs3FileAttributes postOpAttr = null;
     try {
-      String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileId());
-      postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+      postOpAttr = getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId()), iug);
     } catch (IOException e1) {
       LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileId(), e1);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33e020cb/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
index 363bc12..c156fc0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
@@ -138,8 +138,10 @@ public class TestWrites {
     HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
     Mockito.when(fos.getPos()).thenReturn((long) 0);
 
+    NfsConfiguration conf = new NfsConfiguration();
+    conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
     OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
-        new IdUserGroup(new NfsConfiguration()));
+        new IdUserGroup(conf), false, conf);
 
     COMMIT_STATUS ret;
 
@@ -157,6 +159,7 @@ public class TestWrites {
     // Test request with non zero commit offset
     ctx.setActiveStatusForTest(true);
     Mockito.when(fos.getPos()).thenReturn((long) 10);
+    ctx.setNextOffsetForTest(10);
     COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
     Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
     // Do_SYNC state will be updated to FINISHED after data sync
@@ -193,14 +196,84 @@ public class TestWrites {
   }
   
   @Test
+  // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
+  // large file upload option.
+  public void testCheckCommitLargeFileUpload() throws IOException {
+    DFSClient dfsClient = Mockito.mock(DFSClient.class);
+    Nfs3FileAttributes attr = new Nfs3FileAttributes();
+    HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+    Mockito.when(fos.getPos()).thenReturn((long) 0);
+
+    NfsConfiguration conf = new NfsConfiguration();
+    conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
+    OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
+        new IdUserGroup(conf), false, conf);
+
+    COMMIT_STATUS ret;
+
+    // Test inactive open file context
+    ctx.setActiveStatusForTest(false);
+    Channel ch = Mockito.mock(Channel.class);
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
+
+    ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
+        new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
+
+    // Test request with non zero commit offset
+    ctx.setActiveStatusForTest(true);
+    Mockito.when(fos.getPos()).thenReturn((long) 10);
+    ctx.setNextOffsetForTest(10);
+    COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
+    Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
+    // Do_SYNC state will be updated to FINISHED after data sync
+    ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
+    
+    status = ctx.checkCommitInternal(10, ch, 1, attr, false);
+    Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
+    ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
+
+    ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
+        .getPendingCommitsForTest();
+    Assert.assertTrue(commits.size() == 0);
+    ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
+    Assert.assertTrue(commits.size() == 0);
+    
+    // Test request with zero commit offset
+    commits.remove(new Long(11));
+    // There is one pending write [5,10]
+    ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
+    
+    Mockito.when(fos.getPos()).thenReturn((long) 6);
+    ret = ctx.checkCommitInternal(8, ch, 1, attr, false);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
+    Assert.assertTrue(commits.size() == 1);
+    long key = commits.firstKey();
+    Assert.assertTrue(key == 8);
+
+    // Empty pending writes
+    ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
+  }
+  
+  @Test
   public void testCheckCommitAixCompatMode() throws IOException {
     DFSClient dfsClient = Mockito.mock(DFSClient.class);
     Nfs3FileAttributes attr = new Nfs3FileAttributes();
     HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
 
-    // Last argument "true" here to enable AIX compatibility mode.
+    NfsConfiguration conf = new NfsConfiguration();
+    conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
+    // Enable AIX compatibility mode.
     OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
-        new IdUserGroup(new NfsConfiguration()), true);
+        new IdUserGroup(new NfsConfiguration()), true, conf);
     
     // Test fall-through to pendingWrites check in the event that commitOffset
     // is greater than the number of bytes we've so far flushed.
@@ -210,6 +283,8 @@ public class TestWrites {
     
     // Test the case when we actually have received more bytes than we're trying
     // to commit.
+    ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
+        new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
     Mockito.when(fos.getPos()).thenReturn((long) 10);
     status = ctx.checkCommitInternal(5, null, 1, attr, false);
     Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
@@ -226,8 +301,9 @@ public class TestWrites {
     Mockito.when(fos.getPos()).thenReturn((long) 0);
     NfsConfiguration config = new NfsConfiguration();
 
+    config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
     OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
-        new IdUserGroup(config));
+        new IdUserGroup(config), false, config);
 
     FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
     COMMIT_STATUS ret;
@@ -285,6 +361,75 @@ public class TestWrites {
     assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
   }
   
+  @Test
+  // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file
upload option
+  public void testCheckCommitFromReadLargeFileUpload() throws IOException {
+    DFSClient dfsClient = Mockito.mock(DFSClient.class);
+    Nfs3FileAttributes attr = new Nfs3FileAttributes();
+    HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+    Mockito.when(fos.getPos()).thenReturn((long) 0);
+    NfsConfiguration config = new NfsConfiguration();
+
+    config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
+    OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
+        new IdUserGroup(config), false, config);
+
+    FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
+    COMMIT_STATUS ret;
+    WriteManager wm = new WriteManager(new IdUserGroup(config), config, false);
+    assertTrue(wm.addOpenFileStream(h, ctx));
+    
+    // Test inactive open file context
+    ctx.setActiveStatusForTest(false);
+    Channel ch = Mockito.mock(Channel.class);
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
+    assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
+    
+    ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
+        new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
+    assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));
+    
+    // Test request with non zero commit offset
+    ctx.setActiveStatusForTest(true);
+    Mockito.when(fos.getPos()).thenReturn((long) 10);
+    COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
+    assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
+    // Do_SYNC state will be updated to FINISHED after data sync
+    ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
+ 
+    status = ctx.checkCommitInternal(10, ch, 1, attr, true);
+    assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
+    ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));
+
+    ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
+        .getPendingCommitsForTest();
+    assertTrue(commits.size() == 0);
+    ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
+    assertEquals(0, commits.size()); // commit triggered by read doesn't wait
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11));
+
+    // Test request with zero commit offset
+    // There is one pending write [5,10]
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
+    assertEquals(0, commits.size());
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
+
+    // Empty pending writes
+    ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
+  }
+  
   private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
       throws InterruptedException {
     int waitedTime = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33e020cb/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 761f889..cfe621f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -624,6 +624,9 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. (szetszwo)
 
+    HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response.
+    (brandonli)
+
     BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
   
       HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)


Mime
View raw message