hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1532967 [2/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/m...
Date Thu, 17 Oct 2013 05:33:01 GMT
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Thu Oct 17 05:32:42 2013
@@ -18,8 +18,10 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
@@ -28,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FileUtil;
@@ -42,8 +45,10 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.nfs.AccessPrivilege;
 import org.apache.hadoop.nfs.NfsExports;
+import org.apache.hadoop.nfs.NfsFileType;
 import org.apache.hadoop.nfs.NfsTime;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -65,10 +70,12 @@ import org.apache.hadoop.nfs.nfs3.reques
 import org.apache.hadoop.nfs.nfs3.request.READ3Request;
 import org.apache.hadoop.nfs.nfs3.request.READDIR3Request;
 import org.apache.hadoop.nfs.nfs3.request.READDIRPLUS3Request;
+import org.apache.hadoop.nfs.nfs3.request.READLINK3Request;
 import org.apache.hadoop.nfs.nfs3.request.REMOVE3Request;
 import org.apache.hadoop.nfs.nfs3.request.RENAME3Request;
 import org.apache.hadoop.nfs.nfs3.request.RMDIR3Request;
 import org.apache.hadoop.nfs.nfs3.request.SETATTR3Request;
+import org.apache.hadoop.nfs.nfs3.request.SYMLINK3Request;
 import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
 import org.apache.hadoop.nfs.nfs3.request.SetAttr3.SetAttrField;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
@@ -94,24 +101,31 @@ import org.apache.hadoop.nfs.nfs3.respon
 import org.apache.hadoop.nfs.nfs3.response.RMDIR3Response;
 import org.apache.hadoop.nfs.nfs3.response.SETATTR3Response;
 import org.apache.hadoop.nfs.nfs3.response.SYMLINK3Response;
-import org.apache.hadoop.nfs.nfs3.response.VoidResponse;
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccAttr;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcCallCache;
 import org.apache.hadoop.oncrpc.RpcDeniedReply;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
 import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
-import org.apache.hadoop.oncrpc.security.CredentialsSys;
 import org.apache.hadoop.oncrpc.security.Credentials;
-import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.CredentialsSys;
+import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
 import org.apache.hadoop.oncrpc.security.SecurityHandler;
 import org.apache.hadoop.oncrpc.security.SysSecurityHandler;
-import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.security.AccessControlException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * RPC program corresponding to nfs daemon. See {@link Nfs3}.
@@ -121,7 +135,7 @@ public class RpcProgramNfs3 extends RpcP
   public static final FsPermission umask = new FsPermission(
       (short) DEFAULT_UMASK);
   
-  private static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
+  static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
   private static final int MAX_READ_TRANSFER_SIZE = 64 * 1024;
   private static final int MAX_WRITE_TRANSFER_SIZE = 64 * 1024;
   private static final int MAX_READDIR_TRANSFER_SIZE = 64 * 1024;
@@ -146,14 +160,15 @@ public class RpcProgramNfs3 extends RpcP
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
   
+  private final RpcCallCache rpcCallCache;
+
   public RpcProgramNfs3() throws IOException {
     this(new Configuration());
   }
 
-  public RpcProgramNfs3(Configuration config)
-      throws IOException {
+  public RpcProgramNfs3(Configuration config) throws IOException {
     super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
-        Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100);
+        Nfs3Constant.VERSION, Nfs3Constant.VERSION);
    
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup();
@@ -179,6 +194,8 @@ public class RpcProgramNfs3 extends RpcP
     } else {
       clearDirectory(writeDumpDir);
     }
+
+    rpcCallCache = new RpcCallCache("NFS3", 256);
   }
 
   private void clearDirectory(String writeDumpDir) throws IOException {
@@ -205,12 +222,12 @@ public class RpcProgramNfs3 extends RpcP
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS NULL");
     }
-    return new VoidResponse(Nfs3Status.NFS3_OK);
+    return new NFS3Response(Nfs3Status.NFS3_OK);
   }
 
   @Override
-  public GETATTR3Response getattr(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -290,8 +307,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SETATTR3Response setattr(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -366,8 +383,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public LOOKUP3Response lookup(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -428,8 +445,8 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public ACCESS3Response access(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -476,9 +493,70 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READLINK3Response readlink(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
-    return new READLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
+  public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
+    READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
+
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
+    if (dfsClient == null) {
+      response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
+      return response;
+    }
+
+    READLINK3Request request = null;
+
+    try {
+      request = new READLINK3Request(xdr);
+    } catch (IOException e) {
+      LOG.error("Invalid READLINK request");
+      return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL);
+    }
+
+    FileHandle handle = request.getHandle();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NFS READLINK fileId: " + handle.getFileId());
+    }
+
+    String fileIdPath = Nfs3Utils.getFileIdPath(handle);
+    try {
+      String target = dfsClient.getLinkTarget(fileIdPath);
+
+      Nfs3FileAttributes postOpAttr = Nfs3Utils.getFileAttr(dfsClient,
+          fileIdPath, iug);
+      if (postOpAttr == null) {
+        LOG.info("Can't get path for fileId:" + handle.getFileId());
+        return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
+      }
+      if (postOpAttr.getType() != NfsFileType.NFSLNK.toValue()) {
+        LOG.error("Not a symlink, fileId:" + handle.getFileId());
+        return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL);
+      }
+      if (target == null) {
+        LOG.error("Symlink target should not be null, fileId:"
+            + handle.getFileId());
+        return new READLINK3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
+      }
+      if (MAX_READ_TRANSFER_SIZE < target.getBytes().length) {
+        return new READLINK3Response(Nfs3Status.NFS3ERR_IO, postOpAttr, null);
+      }
+
+      return new READLINK3Response(Nfs3Status.NFS3_OK, postOpAttr,
+          target.getBytes());
+
+    } catch (IOException e) {
+      LOG.warn("Readlink error: " + e.getClass(), e);
+      if (e instanceof FileNotFoundException) {
+        return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
+      } else if (e instanceof AccessControlException) {
+        return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES);
+      }
+      return new READLINK3Response(Nfs3Status.NFS3ERR_IO);
+    }
   }
 
   @Override
@@ -509,7 +587,6 @@ public class RpcProgramNfs3 extends RpcP
     long offset = request.getOffset();
     int count = request.getCount();
 
-    
     FileHandle handle = request.getHandle();
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset
@@ -655,8 +732,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public CREATE3Response create(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -765,7 +842,7 @@ public class RpcProgramNfs3 extends RpcP
     
     // Add open stream
     OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir
-        + "/" + postOpObjAttr.getFileId());
+        + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
     fileHandle = new FileHandle(postOpObjAttr.getFileId());
     writeManager.addOpenFileStream(fileHandle, openFileCtx);
     if (LOG.isDebugEnabled()) {
@@ -908,8 +985,7 @@ public class RpcProgramNfs3 extends RpcP
       }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
-      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
-          fileIdPath);
+      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
         WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
             preOpDirAttr);
@@ -991,8 +1067,7 @@ public class RpcProgramNfs3 extends RpcP
       }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
-      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
-          fileIdPath);
+      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
       }
@@ -1033,8 +1108,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RENAME3Response rename(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -1121,18 +1196,96 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SYMLINK3Response symlink(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
-    return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
+  public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
+    SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
+
+    if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      response.setStatus(Nfs3Status.NFS3ERR_ACCES);
+      return response;
+    }
+
+    DFSClient dfsClient = clientCache.get(securityHandler.getUser());
+    if (dfsClient == null) {
+      response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
+      return response;
+    }
+
+    SYMLINK3Request request = null;
+    try {
+      request = new SYMLINK3Request(xdr);
+    } catch (IOException e) {
+      LOG.error("Invalid SYMLINK request");
+      response.setStatus(Nfs3Status.NFS3ERR_INVAL);
+      return response;
+    }
+
+    FileHandle dirHandle = request.getHandle();
+    String name = request.getName();
+    String symData = request.getSymData();
+    String linkDirIdPath = Nfs3Utils.getFileIdPath(dirHandle);
+    // Don't do any name check to source path, just leave it to HDFS
+    String linkIdPath = linkDirIdPath + "/" + name;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NFS SYMLINK, target: " + symData + " link: " + linkIdPath);
+    }
+
+    try {
+      WccData dirWcc = response.getDirWcc();
+      WccAttr preOpAttr = Nfs3Utils.getWccAttr(dfsClient, linkDirIdPath);
+      dirWcc.setPreOpAttr(preOpAttr);
+
+      dfsClient.createSymlink(symData, linkIdPath, false);
+      // Set symlink attr is considered as to change the attr of the target
+      // file. So no need to set symlink attr here after it's created.
+
+      HdfsFileStatus linkstat = dfsClient.getFileLinkInfo(linkIdPath);
+      Nfs3FileAttributes objAttr = Nfs3Utils.getNfs3FileAttrFromFileStatus(
+          linkstat, iug);
+      dirWcc
+          .setPostOpAttr(Nfs3Utils.getFileAttr(dfsClient, linkDirIdPath, iug));
+
+      return new SYMLINK3Response(Nfs3Status.NFS3_OK, new FileHandle(
+          objAttr.getFileid()), objAttr, dirWcc);
+
+    } catch (IOException e) {
+      LOG.warn("Exception:" + e);
+      response.setStatus(Nfs3Status.NFS3ERR_IO);
+      return response;
+    }
   }
 
-  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) {
+  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
+  /**
+   * Used by readdir and readdirplus to get dirents. It retries the listing if
+   * the startAfter can't be found anymore.
+   */
+  private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath,
+      byte[] startAfter) throws IOException {
+    DirectoryListing dlisting = null;
+    try {
+      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
+    } catch (RemoteException e) {
+      IOException io = e.unwrapRemoteException();
+      if (!(io instanceof DirectoryListingStartAfterNotFoundException)) {
+        throw io;
+      }
+      // This happens when startAfter was just deleted
+      LOG.info("Cookie cound't be found: " + new String(startAfter)
+          + ", do listing from beginning");
+      dlisting = dfsClient
+          .listPaths(dirFileIdPath, HdfsFileStatus.EMPTY_NAME);
+    }
+    return dlisting;
+  }
+  
   @Override
-  public READDIR3Response readdir(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1170,7 +1323,7 @@ public class RpcProgramNfs3 extends RpcP
           + cookie + " count: " + count);
     }
 
-    HdfsFileStatus dirStatus;
+    HdfsFileStatus dirStatus = null;
     DirectoryListing dlisting = null;
     Nfs3FileAttributes postOpAttr = null;
     long dotdotFileId = 0;
@@ -1214,8 +1367,8 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
-
+      
+      dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpAttr == null) {
         LOG.error("Can't get path for fileId:" + handle.getFileId());
@@ -1298,11 +1451,15 @@ public class RpcProgramNfs3 extends RpcP
     }
     long dirCount = request.getDirCount();
     if (dirCount <= 0) {
-      LOG.info("Nonpositive count in invalid READDIRPLUS request:" + dirCount);
-      return new READDIRPLUS3Response(Nfs3Status.NFS3_OK);
+      LOG.info("Nonpositive dircount in invalid READDIRPLUS request:" + dirCount);
+      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
     }
     int maxCount = request.getMaxCount();
-
+    if (maxCount <= 0) {
+      LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount);
+      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
+    }
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
           + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount);
@@ -1352,8 +1509,8 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
-
+      
+      dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpDirAttr == null) {
         LOG.info("Can't get path for fileId:" + handle.getFileId());
@@ -1421,8 +1578,8 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public FSSTAT3Response fsstat(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1479,8 +1636,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public FSINFO3Response fsinfo(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1531,8 +1688,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public PATHCONF3Response pathconf(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1578,7 +1735,7 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr,
+  public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
       SecurityHandler securityHandler, InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
@@ -1620,18 +1777,10 @@ public class RpcProgramNfs3 extends RpcP
       long commitOffset = (request.getCount() == 0) ? 0
           : (request.getOffset() + request.getCount());
       
-      int status;
-      if (writeManager.handleCommit(handle, commitOffset)) {
-        status = Nfs3Status.NFS3_OK;
-      } else {
-        status = Nfs3Status.NFS3ERR_IO;
-      }
-      Nfs3FileAttributes postOpAttr = writeManager.getFileAttr(dfsClient,
-          handle, iug);
-      WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
-      return new COMMIT3Response(status, fileWcc,
-          Nfs3Constant.WRITE_COMMIT_VERF);
-
+      // Insert commit as an async request
+      writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
+          preOpAttr);
+      return null;
     } catch (IOException e) {
       LOG.warn("Exception ", e);
       Nfs3FileAttributes postOpAttr = null;
@@ -1657,24 +1806,53 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR xdr = new XDR(data);
+    XDR out = new XDR();
+    InetAddress client = ((InetSocketAddress) info.remoteAddress())
+        .getAddress();
+    Channel channel = info.channel();
 
     Credentials credentials = rpcCall.getCredential();
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
-      if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS
-          && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) {
-        LOG.info("Wrong RPC AUTH flavor, "
-            + rpcCall.getCredential().getFlavor()
+      if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
+          && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) {
+        LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor()
             + " is not AUTH_SYS or RPCSEC_GSS.");
         XDR reply = new XDR();
-        reply = RpcDeniedReply.voidReply(reply, xid,
+        RpcDeniedReply rdr = new RpcDeniedReply(xid,
             RpcReply.ReplyState.MSG_ACCEPTED,
-            RpcDeniedReply.RejectState.AUTH_ERROR);
-        return reply;
+            RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
+        rdr.write(reply);
+
+        ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
+            .buffer());
+        RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+        RpcUtil.sendRpcResponse(ctx, rsp);
+        return;
+      }
+    }
+
+    if (!isIdempotent(rpcCall)) {
+      RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client,
+          xid);
+      if (entry != null) { // in cache
+        if (entry.isCompleted()) {
+          LOG.info("Sending the cached reply to retransmitted request " + xid);
+          RpcUtil.sendRpcResponse(ctx, entry.getResponse());
+          return;
+        } else { // else request is in progress
+          LOG.info("Retransmitted request, transaction still in progress "
+              + xid);
+          // Ignore the request and do nothing
+          return;
+        }
       }
     }
     
@@ -1695,9 +1873,19 @@ public class RpcProgramNfs3 extends RpcP
     } else if (nfsproc3 == NFSPROC3.READLINK) {
       response = readlink(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.READ) {
+      if (LOG.isDebugEnabled()) {
+          LOG.debug(Nfs3Utils.READ_RPC_START + xid);
+      }    
       response = read(xdr, securityHandler, client);
+      if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
+        LOG.debug(Nfs3Utils.READ_RPC_END + xid);
+      }
     } else if (nfsproc3 == NFSPROC3.WRITE) {
+      if (LOG.isDebugEnabled()) {
+          LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
+      }
       response = write(xdr, channel, xid, securityHandler, client);
+      // Write end debug trace is in Nfs3Utils.writeChannel
     } else if (nfsproc3 == NFSPROC3.CREATE) {
       response = create(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.MKDIR) {      
@@ -1725,16 +1913,31 @@ public class RpcProgramNfs3 extends RpcP
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
       response = pathconf(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, securityHandler, client);
+      response = commit(xdr, channel, xid, securityHandler, client);
     } else {
       // Invalid procedure
-      RpcAcceptedReply.voidReply(out, xid,
-          RpcAcceptedReply.AcceptState.PROC_UNAVAIL);
+      RpcAcceptedReply.getInstance(xid,
+          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
+          out);
+    }
+    if (response == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No sync response, expect an async response for request XID="
+            + rpcCall.getXid());
+      }
+      return;
     }
-    if (response != null) {
-      out = response.send(out, xid);
+    // TODO: currently we just return VerifierNone
+    out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+        .buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+
+    if (!isIdempotent(rpcCall)) {
+      rpcCallCache.callCompleted(client, xid, rsp);
     }
-    return out;
+
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Thu Oct 17 05:32:42 2013
@@ -20,13 +20,18 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * WriteCtx saves the context of one write request, such as request, channel,
  * xid and reply status.
@@ -48,14 +53,31 @@ class WriteCtx {
   private final FileHandle handle;
   private final long offset;
   private final int count;
+  
+  //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache()  
+  private final int originalCount; 
+  public static final int INVALID_ORIGINAL_COUNT = -1;
+  
+  public int getOriginalCount() {
+    return originalCount;
+  }
+
   private final WriteStableHow stableHow;
-  private byte[] data;
+  private volatile ByteBuffer data;
   
   private final Channel channel;
   private final int xid;
   private boolean replied;
 
-  private DataState dataState;
+  /** 
+   * Data belonging to the same {@link OpenFileCtx} may be dumped to a file. 
+   * After being dumped to the file, the corresponding {@link WriteCtx} records 
+   * the dump file and the offset.  
+   */
+  private RandomAccessFile raf;
+  private long dumpFileOffset;
+  
+  private volatile DataState dataState;
 
   public DataState getDataState() {
     return dataState;
@@ -64,12 +86,13 @@ class WriteCtx {
   public void setDataState(DataState dataState) {
     this.dataState = dataState;
   }
-
-  private RandomAccessFile raf;
-  private long dumpFileOffset;
   
-  // Return the dumped data size
-  public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
+  /** 
+   * Writing the data into a local file. After the writing, if 
+   * {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set 
+   * {@link #dataState} to DUMPED.
+   */
+  long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
       throws IOException {
     if (dataState != DataState.ALLOW_DUMP) {
       if (LOG.isTraceEnabled()) {
@@ -78,54 +101,104 @@ class WriteCtx {
       }
       return 0;
     }
+
+    // Resized write should not allow dump
+    Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT);
+
     this.raf = raf;
     dumpFileOffset = dumpOut.getChannel().position();
-    dumpOut.write(data, 0, count);
+    dumpOut.write(data.array(), 0, count);
     if (LOG.isDebugEnabled()) {
       LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
     }
-    data = null;
-    dataState = DataState.DUMPED;
-    return count;
+    // it is possible that while we dump the data, the data is also being
+    // written back to HDFS. After dump, if the writing back has not finished
+    // yet, we change its flag to DUMPED and set the data to null. Otherwise
+    // this WriteCtx instance should have been removed from the buffer.
+    if (dataState == DataState.ALLOW_DUMP) {
+      synchronized (this) {
+        if (dataState == DataState.ALLOW_DUMP) {
+          data = null;
+          dataState = DataState.DUMPED;
+          return count;
+        }
+      }
+    }
+    return 0;
   }
 
-  public FileHandle getHandle() {
+  FileHandle getHandle() {
     return handle;
   }
   
-  public long getOffset() {
+  long getOffset() {
     return offset;
   }
 
-  public int getCount() {
+  int getCount() {
     return count;
   }
 
-  public WriteStableHow getStableHow() {
+  WriteStableHow getStableHow() {
     return stableHow;
   }
 
-  public byte[] getData() throws IOException {
+  @VisibleForTesting
+  ByteBuffer getData() throws IOException {
     if (dataState != DataState.DUMPED) {
-      if (data == null) {
-        throw new IOException("Data is not dumpted but has null:" + this);
-      }
-    } else {
-      // read back
-      if (data != null) {
-        throw new IOException("Data is dumpted but not null");
-      }
-      data = new byte[count];
-      raf.seek(dumpFileOffset);
-      int size = raf.read(data, 0, count);
-      if (size != count) {
-        throw new IOException("Data count is " + count + ", but read back "
-            + size + "bytes");
+      synchronized (this) {
+        if (dataState != DataState.DUMPED) {
+          Preconditions.checkState(data != null);
+          return data;
+        }
       }
     }
+    // read back from dumped file
+    this.loadData();
     return data;
   }
 
+  private void loadData() throws IOException {
+    Preconditions.checkState(data == null);
+    byte[] rawData = new byte[count];
+    raf.seek(dumpFileOffset);
+    int size = raf.read(rawData, 0, count);
+    if (size != count) {
+      throw new IOException("Data count is " + count + ", but read back "
+          + size + "bytes");
+    }
+    data = ByteBuffer.wrap(rawData);
+  }
+
+  public void writeData(HdfsDataOutputStream fos) throws IOException {
+    Preconditions.checkState(fos != null);
+
+    ByteBuffer dataBuffer = null;
+    try {
+      dataBuffer = getData();
+    } catch (Exception e1) {
+      LOG.error("Failed to get request data offset:" + offset + " count:"
+          + count + " error:" + e1);
+      throw new IOException("Can't get WriteCtx.data");
+    }
+
+    byte[] data = dataBuffer.array();
+    int position = dataBuffer.position();
+    int limit = dataBuffer.limit();
+    Preconditions.checkState(limit - position == count);
+    // Modified write has a valid original count
+    if (position != 0) {
+      if (limit != getOriginalCount()) {
+        throw new IOException("Modified write has differnt original size."
+            + "buff position:" + position + " buff limit:" + limit + ". "
+            + toString());
+      }
+    }
+    
+    // Now write data
+    fos.write(data, position, count);
+  }
+  
   Channel getChannel() {
     return channel;
   }
@@ -142,11 +215,13 @@ class WriteCtx {
     this.replied = replied;
   }
   
-  WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow,
-      byte[] data, Channel channel, int xid, boolean replied, DataState dataState) {
+  WriteCtx(FileHandle handle, long offset, int count, int originalCount,
+      WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid,
+      boolean replied, DataState dataState) {
     this.handle = handle;
     this.offset = offset;
     this.count = count;
+    this.originalCount = originalCount;
     this.stableHow = stableHow;
     this.data = data;
     this.channel = channel;
@@ -159,7 +234,7 @@ class WriteCtx {
   @Override
   public String toString() {
     return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
-        + " stableHow:" + stableHow + " replied:" + replied + " dataState:"
-        + dataState + " xid:" + xid;
+        + " originalCount:" + originalCount + " stableHow:" + stableHow
+        + " replied:" + replied + " dataState:" + dataState + " xid:" + xid;
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Thu Oct 17 05:32:42 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
 import org.apache.hadoop.nfs.NfsFileType;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -36,9 +37,11 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.util.Daemon;
 import org.jboss.netty.channel.Channel;
 
@@ -66,8 +69,8 @@ public class WriteManager {
    */
   private long streamTimeout;
   
-  public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second
-  public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second
+  public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
+  public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
   
   void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
     openFileMap.put(h, ctx);
@@ -118,7 +121,8 @@ public class WriteManager {
     byte[] data = request.getData().array();
     if (data.length < count) {
       WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
-      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+      Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+          new XDR(), xid, new VerifierNone()), xid);
       return;
     }
 
@@ -155,7 +159,8 @@ public class WriteManager {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
             fileWcc, count, request.getStableHow(),
             Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+            new XDR(), xid, new VerifierNone()), xid);
         return;
       }
 
@@ -163,7 +168,7 @@ public class WriteManager {
       String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
           Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
       openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
-          + fileHandle.getFileId());
+          + fileHandle.getFileId(), dfsClient, iug);
       addOpenFileStream(fileHandle, openFileCtx);
       if (LOG.isDebugEnabled()) {
         LOG.debug("opened stream for file:" + fileHandle.getFileId());
@@ -173,65 +178,55 @@ public class WriteManager {
     // Add write into the async job queue
     openFileCtx.receivedNewWrite(dfsClient, request, channel, xid,
         asyncDataService, iug);
-    // Block stable write
-    if (request.getStableHow() != WriteStableHow.UNSTABLE) {
-      if (handleCommit(fileHandle, offset + count)) {
-        Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug);
-        WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
-            postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, request.getStableHow(),
-            Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-      } else {
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-      }
-    }
-
     return;
   }
 
-  boolean handleCommit(FileHandle fileHandle, long commitOffset) {
+  void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
+      long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    int status;
     OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+
     if (openFileCtx == null) {
       LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
-          + " commitOffset=" + commitOffset);
-      return true;
-    }
-    long timeout = 30 * 1000; // 30 seconds
-    long startCommit = System.currentTimeMillis();
-    while (true) {
-      int ret = openFileCtx.checkCommit(commitOffset);
-      if (ret == OpenFileCtx.COMMIT_FINISHED) {
-        // Committed
-        return true;
-      } else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) {
-        LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-        return true;
-      }
-      assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
-      if (ret == OpenFileCtx.COMMIT_ERROR) {
-        return false;
-      }
+          + " commitOffset=" + commitOffset + ". Return success in this case.");
+      status = Nfs3Status.NFS3_OK;
       
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-      }
-      if (System.currentTimeMillis() - startCommit > timeout) {
-        // Commit took too long, return error
-        return false;
-      }
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-        return false;
+    } else {
+      COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
+          channel, xid, preOpAttr);
+      switch (ret) {
+      case COMMIT_DO_SYNC:
+      case COMMIT_FINISHED:
+      case COMMIT_INACTIVE_CTX:
+        status = Nfs3Status.NFS3_OK;
+        break;
+      case COMMIT_INACTIVE_WITH_PENDING_WRITE:
+      case COMMIT_ERROR:
+        status = Nfs3Status.NFS3ERR_IO;
+        break;
+      case COMMIT_WAIT:
+        // Do nothing. Commit is async now.
+        return;
+      default:
+        throw new RuntimeException("Should not get commit return code:"
+            + ret.name());
       }
-    }// while
+    }
+    
+    // Send out the response
+    Nfs3FileAttributes postOpAttr = null;
+    try {
+      String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileid());
+      postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+    } catch (IOException e1) {
+      LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileid());
+    }
+    WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
+    COMMIT3Response response = new COMMIT3Response(status, fileWcc,
+        Nfs3Constant.WRITE_COMMIT_VERF);
+    Nfs3Utils.writeChannelCommit(channel,
+        response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+        xid);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java Thu Oct 17 05:32:42 2013
@@ -33,11 +33,13 @@ import org.apache.hadoop.nfs.nfs3.reques
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
 import org.apache.hadoop.oncrpc.RegistrationClient;
 import org.apache.hadoop.oncrpc.RpcCall;
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
 import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.SimpleTcpClient;
 import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -58,15 +60,9 @@ public class TestOutOfOrderWrite {
 
   static XDR create() {
     XDR request = new XDR();
-    RpcCall.write(request, 0x8000004c, Nfs3Constant.PROGRAM,
-        Nfs3Constant.VERSION, Nfs3Constant.NFSPROC3.CREATE.getValue());
-
-    // credentials
-    request.writeInt(0); // auth null
-    request.writeInt(0); // length zero
-    // verifier
-    request.writeInt(0); // auth null
-    request.writeInt(0); // length zero
+    RpcCall.getInstance(0x8000004c, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
+        Nfs3Constant.NFSPROC3.CREATE.getValue(), new CredentialsNone(),
+        new VerifierNone()).write(request);
 
     SetAttr3 objAttr = new SetAttr3();
     CREATE3Request createReq = new CREATE3Request(new FileHandle("/"),
@@ -78,15 +74,10 @@ public class TestOutOfOrderWrite {
   static XDR write(FileHandle handle, int xid, long offset, int count,
       byte[] data) {
     XDR request = new XDR();
-    RpcCall.write(request, xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
-        Nfs3Constant.NFSPROC3.WRITE.getValue());
+    RpcCall.getInstance(xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
+        Nfs3Constant.NFSPROC3.CREATE.getValue(), new CredentialsNone(),
+        new VerifierNone()).write(request);
 
-    // credentials
-    request.writeInt(0); // auth null
-    request.writeInt(0); // length zero
-    // verifier
-    request.writeInt(0); // auth null
-    request.writeInt(0); // length zero
     WRITE3Request write1 = new WRITE3Request(handle, offset, count,
         WriteStableHow.UNSTABLE, ByteBuffer.wrap(data));
     write1.serialize(request);
@@ -145,8 +136,9 @@ public class TestOutOfOrderWrite {
     protected ChannelPipelineFactory setPipelineFactory() {
       this.pipelineFactory = new ChannelPipelineFactory() {
         public ChannelPipeline getPipeline() {
-          return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler(
-              request));
+          return Channels.pipeline(
+              RpcUtil.constructRpcFrameDecoder(),
+              new WriteHandler(request));
         }
       };
       return this.pipelineFactory;
@@ -174,11 +166,11 @@ public class TestOutOfOrderWrite {
     XDR writeReq;
 
     writeReq = write(handle, 0x8000005c, 2000, 1000, data3);
-    Nfs3Utils.writeChannel(channel, writeReq);
+    Nfs3Utils.writeChannel(channel, writeReq, 1);
     writeReq = write(handle, 0x8000005d, 1000, 1000, data2);
-    Nfs3Utils.writeChannel(channel, writeReq);
+    Nfs3Utils.writeChannel(channel, writeReq, 2);
     writeReq = write(handle, 0x8000005e, 0, 1000, data1);
-    Nfs3Utils.writeChannel(channel, writeReq);
+    Nfs3Utils.writeChannel(channel, writeReq, 3);
 
     // TODO: convert to Junit test, and validate result automatically
   }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java Thu Oct 17 05:32:42 2013
@@ -26,6 +26,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
 import org.apache.hadoop.oncrpc.RegistrationClient;
 import org.apache.hadoop.oncrpc.RpcCall;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.portmap.PortmapMapping;
 import org.apache.hadoop.portmap.PortmapRequest;
 
@@ -78,11 +80,8 @@ public class TestPortmapRegister {
   
   static void createPortmapXDRheader(XDR xdr_out, int procedure) {
     // TODO: Move this to RpcRequest
-    RpcCall.write(xdr_out, 0, 100000, 2, procedure);
-    xdr_out.writeInt(0); //no auth
-    xdr_out.writeInt(0);
-    xdr_out.writeInt(0);
-    xdr_out.writeInt(0);
+    RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(),
+        new VerifierNone()).write(xdr_out);
     
     /*
     xdr_out.putInt(1); //unix auth

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java Thu Oct 17 05:32:42 2013
@@ -27,6 +27,8 @@ import java.net.UnknownHostException;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
 import org.apache.hadoop.oncrpc.RpcCall;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 
 // TODO: convert this to Junit
 public class TestUdpServer {
@@ -82,7 +84,8 @@ public class TestUdpServer {
   
   static void createPortmapXDRheader(XDR xdr_out, int procedure) {
     // Make this a method
-    RpcCall.write(xdr_out, 0, 100000, 2, procedure);
+    RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(),
+        new VerifierNone()).write(xdr_out);
   }
  
   static void testGetportMount() {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java Thu Oct 17 05:32:42 2013
@@ -17,41 +17,44 @@
  */
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class TestDFSClientCache {
   @Test
-  public void testLruTable() throws IOException {
-    DFSClientCache cache = new DFSClientCache(new Configuration(), 3);
-    DFSClient client = Mockito.mock(DFSClient.class);
-    cache.put("a", client);
-    assertTrue(cache.containsKey("a"));
-
-    cache.put("b", client);
-    cache.put("c", client);
-    cache.put("d", client);
-    assertTrue(cache.usedSize() == 3);
-    assertFalse(cache.containsKey("a"));
-
-    // Cache should have d,c,b in LRU order
-    assertTrue(cache.containsKey("b"));
-    // Do a lookup to make b the most recently used
-    assertTrue(cache.get("b") != null);
-
-    cache.put("e", client);
-    assertTrue(cache.usedSize() == 3);
-    // c should be replaced with e, and cache has e,b,d
-    assertFalse(cache.containsKey("c"));
-    assertTrue(cache.containsKey("e"));
-    assertTrue(cache.containsKey("b"));
-    assertTrue(cache.containsKey("d"));
+  public void testEviction() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
+
+    // Only one entry will be in the cache
+    final int MAX_CACHE_SIZE = 2;
+
+    DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
+
+    DFSClient c1 = cache.get("test1");
+    assertTrue(cache.get("test1").toString().contains("ugi=test1"));
+    assertEquals(c1, cache.get("test1"));
+    assertFalse(isDfsClientClose(c1));
+
+    cache.get("test2");
+    assertTrue(isDfsClientClose(c1));
+    assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
+  }
+
+  private static boolean isDfsClientClose(DFSClient c) {
+    try {
+      c.exists("");
+    } catch (IOException e) {
+      return e.getMessage().equals("Filesystem closed");
+    }
+    return false;
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java Thu Oct 17 05:32:42 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 
@@ -51,8 +52,9 @@ public class TestOffsetRange {
     OffsetRange r3 = new OffsetRange(1, 3);
     OffsetRange r4 = new OffsetRange(3, 4);
 
-    assertTrue(r2.compareTo(r3) == 0);
-    assertTrue(r2.compareTo(r1) == 1);
-    assertTrue(r2.compareTo(r4) == -1);
+    assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3));
+    assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2));
+    assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0);
+    assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0);
   }
 }



Mime
View raw message