Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7CA4E100C7 for ; Thu, 17 Oct 2013 05:34:51 +0000 (UTC) Received: (qmail 97098 invoked by uid 500); 17 Oct 2013 05:34:06 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 96883 invoked by uid 500); 17 Oct 2013 05:33:54 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 96864 invoked by uid 99); 17 Oct 2013 05:33:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 05:33:51 +0000 X-ASF-Spam-Status: No, hits=-1999.0 required=5.0 tests=ALL_TRUSTED,FRT_OFFER2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 05:33:47 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4D49D2388C2D; Thu, 17 Oct 2013 05:33:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1532967 [2/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/m... Date: Thu, 17 Oct 2013 05:33:01 -0000 To: hdfs-commits@hadoop.apache.org From: wang@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131017053302.4D49D2388C2D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Thu Oct 17 05:32:42 2013 @@ -18,8 +18,10 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; @@ -28,6 +30,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileUtil; @@ -42,8 +45,10 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.nfs.AccessPrivilege; import org.apache.hadoop.nfs.NfsExports; +import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.NfsTime; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; @@ -65,10 +70,12 @@ import org.apache.hadoop.nfs.nfs3.reques import org.apache.hadoop.nfs.nfs3.request.READ3Request; import org.apache.hadoop.nfs.nfs3.request.READDIR3Request; import org.apache.hadoop.nfs.nfs3.request.READDIRPLUS3Request; +import org.apache.hadoop.nfs.nfs3.request.READLINK3Request; import org.apache.hadoop.nfs.nfs3.request.REMOVE3Request; import org.apache.hadoop.nfs.nfs3.request.RENAME3Request; import org.apache.hadoop.nfs.nfs3.request.RMDIR3Request; import org.apache.hadoop.nfs.nfs3.request.SETATTR3Request; +import org.apache.hadoop.nfs.nfs3.request.SYMLINK3Request; import org.apache.hadoop.nfs.nfs3.request.SetAttr3; import org.apache.hadoop.nfs.nfs3.request.SetAttr3.SetAttrField; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; @@ -94,24 +101,31 @@ import org.apache.hadoop.nfs.nfs3.respon import org.apache.hadoop.nfs.nfs3.response.RMDIR3Response; import org.apache.hadoop.nfs.nfs3.response.SETATTR3Response; import org.apache.hadoop.nfs.nfs3.response.SYMLINK3Response; -import org.apache.hadoop.nfs.nfs3.response.VoidResponse; import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; import org.apache.hadoop.nfs.nfs3.response.WccAttr; import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.RpcCallCache; import org.apache.hadoop.oncrpc.RpcDeniedReply; +import org.apache.hadoop.oncrpc.RpcInfo; import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcReply; +import org.apache.hadoop.oncrpc.RpcResponse; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.XDR; -import org.apache.hadoop.oncrpc.security.CredentialsSys; import org.apache.hadoop.oncrpc.security.Credentials; -import org.apache.hadoop.oncrpc.security.Verifier; +import org.apache.hadoop.oncrpc.security.CredentialsSys; +import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor; import org.apache.hadoop.oncrpc.security.SecurityHandler; import org.apache.hadoop.oncrpc.security.SysSecurityHandler; -import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor; +import org.apache.hadoop.oncrpc.security.Verifier; +import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.security.AccessControlException; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; /** * RPC program corresponding to nfs daemon. See {@link Nfs3}. @@ -121,7 +135,7 @@ public class RpcProgramNfs3 extends RpcP public static final FsPermission umask = new FsPermission( (short) DEFAULT_UMASK); - private static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class); + static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class); private static final int MAX_READ_TRANSFER_SIZE = 64 * 1024; private static final int MAX_WRITE_TRANSFER_SIZE = 64 * 1024; private static final int MAX_READDIR_TRANSFER_SIZE = 64 * 1024; @@ -146,14 +160,15 @@ public class RpcProgramNfs3 extends RpcP private Statistics statistics; private String writeDumpDir; // The dir save dump files + private final RpcCallCache rpcCallCache; + public RpcProgramNfs3() throws IOException { this(new Configuration()); } - public RpcProgramNfs3(Configuration config) - throws IOException { + public RpcProgramNfs3(Configuration config) throws IOException { super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM, - Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100); + Nfs3Constant.VERSION, Nfs3Constant.VERSION); config.set(FsPermission.UMASK_LABEL, "000"); iug = new IdUserGroup(); @@ -179,6 +194,8 @@ public class RpcProgramNfs3 extends RpcP } else { clearDirectory(writeDumpDir); } + + rpcCallCache = new RpcCallCache("NFS3", 256); } private void clearDirectory(String writeDumpDir) throws IOException { @@ -205,12 +222,12 @@ public class RpcProgramNfs3 extends RpcP if (LOG.isDebugEnabled()) { LOG.debug("NFS NULL"); } - return new VoidResponse(Nfs3Status.NFS3_OK); + return new NFS3Response(Nfs3Status.NFS3_OK); } @Override - public GETATTR3Response getattr(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -290,8 +307,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public SETATTR3Response setattr(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -366,8 +383,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public LOOKUP3Response lookup(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -428,8 +445,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public ACCESS3Response access(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -476,9 +493,70 @@ public class RpcProgramNfs3 extends RpcP } } - public READLINK3Response readlink(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { - return new READLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP); + public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { + READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK); + + if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { + response.setStatus(Nfs3Status.NFS3ERR_ACCES); + return response; + } + + DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + + READLINK3Request request = null; + + try { + request = new READLINK3Request(xdr); + } catch (IOException e) { + LOG.error("Invalid READLINK request"); + return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL); + } + + FileHandle handle = request.getHandle(); + if (LOG.isDebugEnabled()) { + LOG.debug("NFS READLINK fileId: " + handle.getFileId()); + } + + String fileIdPath = Nfs3Utils.getFileIdPath(handle); + try { + String target = dfsClient.getLinkTarget(fileIdPath); + + Nfs3FileAttributes postOpAttr = Nfs3Utils.getFileAttr(dfsClient, + fileIdPath, iug); + if (postOpAttr == null) { + LOG.info("Can't get path for fileId:" + handle.getFileId()); + return new READLINK3Response(Nfs3Status.NFS3ERR_STALE); + } + if (postOpAttr.getType() != NfsFileType.NFSLNK.toValue()) { + LOG.error("Not a symlink, fileId:" + handle.getFileId()); + return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL); + } + if (target == null) { + LOG.error("Symlink target should not be null, fileId:" + + handle.getFileId()); + return new READLINK3Response(Nfs3Status.NFS3ERR_SERVERFAULT); + } + if (MAX_READ_TRANSFER_SIZE < target.getBytes().length) { + return new READLINK3Response(Nfs3Status.NFS3ERR_IO, postOpAttr, null); + } + + return new READLINK3Response(Nfs3Status.NFS3_OK, postOpAttr, + target.getBytes()); + + } catch (IOException e) { + LOG.warn("Readlink error: " + e.getClass(), e); + if (e instanceof FileNotFoundException) { + return new READLINK3Response(Nfs3Status.NFS3ERR_STALE); + } else if (e instanceof AccessControlException) { + return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES); + } + return new READLINK3Response(Nfs3Status.NFS3ERR_IO); + } } @Override @@ -509,7 +587,6 @@ public class RpcProgramNfs3 extends RpcP long offset = request.getOffset(); int count = request.getCount(); - FileHandle handle = request.getHandle(); if (LOG.isDebugEnabled()) { LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset @@ -655,8 +732,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public CREATE3Response create(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public CREATE3Response create(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -765,7 +842,7 @@ public class RpcProgramNfs3 extends RpcP // Add open stream OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir - + "/" + postOpObjAttr.getFileId()); + + "/" + postOpObjAttr.getFileId(), dfsClient, iug); fileHandle = new FileHandle(postOpObjAttr.getFileId()); writeManager.addOpenFileStream(fileHandle, openFileCtx); if (LOG.isDebugEnabled()) { @@ -908,8 +985,7 @@ public class RpcProgramNfs3 extends RpcP } String fileIdPath = dirFileIdPath + "/" + fileName; - HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, - fileIdPath); + HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath); if (fstat == null) { WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr); @@ -991,8 +1067,7 @@ public class RpcProgramNfs3 extends RpcP } String fileIdPath = dirFileIdPath + "/" + fileName; - HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, - fileIdPath); + HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath); if (fstat == null) { return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc); } @@ -1033,8 +1108,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public RENAME3Response rename(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -1121,18 +1196,96 @@ public class RpcProgramNfs3 extends RpcP } @Override - public SYMLINK3Response symlink(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { - return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP); + public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { + SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK); + + if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) { + response.setStatus(Nfs3Status.NFS3ERR_ACCES); + return response; + } + + DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + + SYMLINK3Request request = null; + try { + request = new SYMLINK3Request(xdr); + } catch (IOException e) { + LOG.error("Invalid SYMLINK request"); + response.setStatus(Nfs3Status.NFS3ERR_INVAL); + return response; + } + + FileHandle dirHandle = request.getHandle(); + String name = request.getName(); + String symData = request.getSymData(); + String linkDirIdPath = Nfs3Utils.getFileIdPath(dirHandle); + // Don't do any name check to source path, just leave it to HDFS + String linkIdPath = linkDirIdPath + "/" + name; + if (LOG.isDebugEnabled()) { + LOG.debug("NFS SYMLINK, target: " + symData + " link: " + linkIdPath); + } + + try { + WccData dirWcc = response.getDirWcc(); + WccAttr preOpAttr = Nfs3Utils.getWccAttr(dfsClient, linkDirIdPath); + dirWcc.setPreOpAttr(preOpAttr); + + dfsClient.createSymlink(symData, linkIdPath, false); + // Set symlink attr is considered as to change the attr of the target + // file. So no need to set symlink attr here after it's created. + + HdfsFileStatus linkstat = dfsClient.getFileLinkInfo(linkIdPath); + Nfs3FileAttributes objAttr = Nfs3Utils.getNfs3FileAttrFromFileStatus( + linkstat, iug); + dirWcc + .setPostOpAttr(Nfs3Utils.getFileAttr(dfsClient, linkDirIdPath, iug)); + + return new SYMLINK3Response(Nfs3Status.NFS3_OK, new FileHandle( + objAttr.getFileid()), objAttr, dirWcc); + + } catch (IOException e) { + LOG.warn("Exception:" + e); + response.setStatus(Nfs3Status.NFS3ERR_IO); + return response; + } } - public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) { + public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP); } + /** + * Used by readdir and readdirplus to get dirents. It retries the listing if + * the startAfter can't be found anymore. + */ + private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath, + byte[] startAfter) throws IOException { + DirectoryListing dlisting = null; + try { + dlisting = dfsClient.listPaths(dirFileIdPath, startAfter); + } catch (RemoteException e) { + IOException io = e.unwrapRemoteException(); + if (!(io instanceof DirectoryListingStartAfterNotFoundException)) { + throw io; + } + // This happens when startAfter was just deleted + LOG.info("Cookie cound't be found: " + new String(startAfter) + + ", do listing from beginning"); + dlisting = dfsClient + .listPaths(dirFileIdPath, HdfsFileStatus.EMPTY_NAME); + } + return dlisting; + } + @Override - public READDIR3Response readdir(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1170,7 +1323,7 @@ public class RpcProgramNfs3 extends RpcP + cookie + " count: " + count); } - HdfsFileStatus dirStatus; + HdfsFileStatus dirStatus = null; DirectoryListing dlisting = null; Nfs3FileAttributes postOpAttr = null; long dotdotFileId = 0; @@ -1214,8 +1367,8 @@ public class RpcProgramNfs3 extends RpcP String inodeIdPath = Nfs3Utils.getFileIdPath(cookie); startAfter = inodeIdPath.getBytes(); } - dlisting = dfsClient.listPaths(dirFileIdPath, startAfter); - + + dlisting = listPaths(dfsClient, dirFileIdPath, startAfter); postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug); if (postOpAttr == null) { LOG.error("Can't get path for fileId:" + handle.getFileId()); @@ -1298,11 +1451,15 @@ public class RpcProgramNfs3 extends RpcP } long dirCount = request.getDirCount(); if (dirCount <= 0) { - LOG.info("Nonpositive count in invalid READDIRPLUS request:" + dirCount); - return new READDIRPLUS3Response(Nfs3Status.NFS3_OK); + LOG.info("Nonpositive dircount in invalid READDIRPLUS request:" + dirCount); + return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL); } int maxCount = request.getMaxCount(); - + if (maxCount <= 0) { + LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount); + return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL); + } + if (LOG.isDebugEnabled()) { LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: " + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount); @@ -1352,8 +1509,8 @@ public class RpcProgramNfs3 extends RpcP String inodeIdPath = Nfs3Utils.getFileIdPath(cookie); startAfter = inodeIdPath.getBytes(); } - dlisting = dfsClient.listPaths(dirFileIdPath, startAfter); - + + dlisting = listPaths(dfsClient, dirFileIdPath, startAfter); postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug); if (postOpDirAttr == null) { LOG.info("Can't get path for fileId:" + handle.getFileId()); @@ -1421,8 +1578,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public FSSTAT3Response fsstat(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1479,8 +1636,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public FSINFO3Response fsinfo(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1531,8 +1688,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public PATHCONF3Response pathconf(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1578,7 +1735,7 @@ public class RpcProgramNfs3 extends RpcP } @Override - public COMMIT3Response commit(XDR xdr, + public COMMIT3Response commit(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, InetAddress client) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); @@ -1620,18 +1777,10 @@ public class RpcProgramNfs3 extends RpcP long commitOffset = (request.getCount() == 0) ? 0 : (request.getOffset() + request.getCount()); - int status; - if (writeManager.handleCommit(handle, commitOffset)) { - status = Nfs3Status.NFS3_OK; - } else { - status = Nfs3Status.NFS3ERR_IO; - } - Nfs3FileAttributes postOpAttr = writeManager.getFileAttr(dfsClient, - handle, iug); - WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr); - return new COMMIT3Response(status, fileWcc, - Nfs3Constant.WRITE_COMMIT_VERF); - + // Insert commit as an async request + writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid, + preOpAttr); + return null; } catch (IOException e) { LOG.warn("Exception ", e); Nfs3FileAttributes postOpAttr = null; @@ -1657,24 +1806,53 @@ public class RpcProgramNfs3 extends RpcP } @Override - public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out, - InetAddress client, Channel channel) { + public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + RpcCall rpcCall = (RpcCall) info.header(); final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure()); int xid = rpcCall.getXid(); + byte[] data = new byte[info.data().readableBytes()]; + info.data().readBytes(data); + XDR xdr = new XDR(data); + XDR out = new XDR(); + InetAddress client = ((InetSocketAddress) info.remoteAddress()) + .getAddress(); + Channel channel = info.channel(); Credentials credentials = rpcCall.getCredential(); // Ignore auth only for NFSPROC3_NULL, especially for Linux clients. if (nfsproc3 != NFSPROC3.NULL) { - if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS - && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) { - LOG.info("Wrong RPC AUTH flavor, " - + rpcCall.getCredential().getFlavor() + if (credentials.getFlavor() != AuthFlavor.AUTH_SYS + && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) { + LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor() + " is not AUTH_SYS or RPCSEC_GSS."); XDR reply = new XDR(); - reply = RpcDeniedReply.voidReply(reply, xid, + RpcDeniedReply rdr = new RpcDeniedReply(xid, RpcReply.ReplyState.MSG_ACCEPTED, - RpcDeniedReply.RejectState.AUTH_ERROR); - return reply; + RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone()); + rdr.write(reply); + + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); + return; + } + } + + if (!isIdempotent(rpcCall)) { + RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client, + xid); + if (entry != null) { // in cache + if (entry.isCompleted()) { + LOG.info("Sending the cached reply to retransmitted request " + xid); + RpcUtil.sendRpcResponse(ctx, entry.getResponse()); + return; + } else { // else request is in progress + LOG.info("Retransmitted request, transaction still in progress " + + xid); + // Ignore the request and do nothing + return; + } } } @@ -1695,9 +1873,19 @@ public class RpcProgramNfs3 extends RpcP } else if (nfsproc3 == NFSPROC3.READLINK) { response = readlink(xdr, securityHandler, client); } else if (nfsproc3 == NFSPROC3.READ) { + if (LOG.isDebugEnabled()) { + LOG.debug(Nfs3Utils.READ_RPC_START + xid); + } response = read(xdr, securityHandler, client); + if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) { + LOG.debug(Nfs3Utils.READ_RPC_END + xid); + } } else if (nfsproc3 == NFSPROC3.WRITE) { + if (LOG.isDebugEnabled()) { + LOG.debug(Nfs3Utils.WRITE_RPC_START + xid); + } response = write(xdr, channel, xid, securityHandler, client); + // Write end debug trace is in Nfs3Utils.writeChannel } else if (nfsproc3 == NFSPROC3.CREATE) { response = create(xdr, securityHandler, client); } else if (nfsproc3 == NFSPROC3.MKDIR) { @@ -1725,16 +1913,31 @@ public class RpcProgramNfs3 extends RpcP } else if (nfsproc3 == NFSPROC3.PATHCONF) { response = pathconf(xdr, securityHandler, client); } else if (nfsproc3 == NFSPROC3.COMMIT) { - response = commit(xdr, securityHandler, client); + response = commit(xdr, channel, xid, securityHandler, client); } else { // Invalid procedure - RpcAcceptedReply.voidReply(out, xid, - RpcAcceptedReply.AcceptState.PROC_UNAVAIL); + RpcAcceptedReply.getInstance(xid, + RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( + out); + } + if (response == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No sync response, expect an async response for request XID=" + + rpcCall.getXid()); + } + return; } - if (response != null) { - out = response.send(out, xid); + // TODO: currently we just return VerifierNone + out = response.writeHeaderAndResponse(out, xid, new VerifierNone()); + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + + if (!isIdempotent(rpcCall)) { + rpcCallCache.callCompleted(client, xid, rsp); } - return out; + + RpcUtil.sendRpcResponse(ctx, rsp); } @Override Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Thu Oct 17 05:32:42 2013 @@ -20,13 +20,18 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.jboss.netty.channel.Channel; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /** * WriteCtx saves the context of one write request, such as request, channel, * xid and reply status. @@ -48,14 +53,31 @@ class WriteCtx { private final FileHandle handle; private final long offset; private final int count; + + //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache() + private final int originalCount; + public static final int INVALID_ORIGINAL_COUNT = -1; + + public int getOriginalCount() { + return originalCount; + } + private final WriteStableHow stableHow; - private byte[] data; + private volatile ByteBuffer data; private final Channel channel; private final int xid; private boolean replied; - private DataState dataState; + /** + * Data belonging to the same {@link OpenFileCtx} may be dumped to a file. + * After being dumped to the file, the corresponding {@link WriteCtx} records + * the dump file and the offset. + */ + private RandomAccessFile raf; + private long dumpFileOffset; + + private volatile DataState dataState; public DataState getDataState() { return dataState; @@ -64,12 +86,13 @@ class WriteCtx { public void setDataState(DataState dataState) { this.dataState = dataState; } - - private RandomAccessFile raf; - private long dumpFileOffset; - // Return the dumped data size - public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf) + /** + * Writing the data into a local file. After the writing, if + * {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set + * {@link #dataState} to DUMPED. + */ + long dumpData(FileOutputStream dumpOut, RandomAccessFile raf) throws IOException { if (dataState != DataState.ALLOW_DUMP) { if (LOG.isTraceEnabled()) { @@ -78,54 +101,104 @@ class WriteCtx { } return 0; } + + // Resized write should not allow dump + Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT); + this.raf = raf; dumpFileOffset = dumpOut.getChannel().position(); - dumpOut.write(data, 0, count); + dumpOut.write(data.array(), 0, count); if (LOG.isDebugEnabled()) { LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset); } - data = null; - dataState = DataState.DUMPED; - return count; + // it is possible that while we dump the data, the data is also being + // written back to HDFS. After dump, if the writing back has not finished + // yet, we change its flag to DUMPED and set the data to null. Otherwise + // this WriteCtx instance should have been removed from the buffer. + if (dataState == DataState.ALLOW_DUMP) { + synchronized (this) { + if (dataState == DataState.ALLOW_DUMP) { + data = null; + dataState = DataState.DUMPED; + return count; + } + } + } + return 0; } - public FileHandle getHandle() { + FileHandle getHandle() { return handle; } - public long getOffset() { + long getOffset() { return offset; } - public int getCount() { + int getCount() { return count; } - public WriteStableHow getStableHow() { + WriteStableHow getStableHow() { return stableHow; } - public byte[] getData() throws IOException { + @VisibleForTesting + ByteBuffer getData() throws IOException { if (dataState != DataState.DUMPED) { - if (data == null) { - throw new IOException("Data is not dumpted but has null:" + this); - } - } else { - // read back - if (data != null) { - throw new IOException("Data is dumpted but not null"); - } - data = new byte[count]; - raf.seek(dumpFileOffset); - int size = raf.read(data, 0, count); - if (size != count) { - throw new IOException("Data count is " + count + ", but read back " - + size + "bytes"); + synchronized (this) { + if (dataState != DataState.DUMPED) { + Preconditions.checkState(data != null); + return data; + } } } + // read back from dumped file + this.loadData(); return data; } + private void loadData() throws IOException { + Preconditions.checkState(data == null); + byte[] rawData = new byte[count]; + raf.seek(dumpFileOffset); + int size = raf.read(rawData, 0, count); + if (size != count) { + throw new IOException("Data count is " + count + ", but read back " + + size + "bytes"); + } + data = ByteBuffer.wrap(rawData); + } + + public void writeData(HdfsDataOutputStream fos) throws IOException { + Preconditions.checkState(fos != null); + + ByteBuffer dataBuffer = null; + try { + dataBuffer = getData(); + } catch (Exception e1) { + LOG.error("Failed to get request data offset:" + offset + " count:" + + count + " error:" + e1); + throw new IOException("Can't get WriteCtx.data"); + } + + byte[] data = dataBuffer.array(); + int position = dataBuffer.position(); + int limit = dataBuffer.limit(); + Preconditions.checkState(limit - position == count); + // Modified write has a valid original count + if (position != 0) { + if (limit != getOriginalCount()) { + throw new IOException("Modified write has differnt original size." + + "buff position:" + position + " buff limit:" + limit + ". " + + toString()); + } + } + + // Now write data + fos.write(data, position, count); + } + Channel getChannel() { return channel; } @@ -142,11 +215,13 @@ class WriteCtx { this.replied = replied; } - WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow, - byte[] data, Channel channel, int xid, boolean replied, DataState dataState) { + WriteCtx(FileHandle handle, long offset, int count, int originalCount, + WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid, + boolean replied, DataState dataState) { this.handle = handle; this.offset = offset; this.count = count; + this.originalCount = originalCount; this.stableHow = stableHow; this.data = data; this.channel = channel; @@ -159,7 +234,7 @@ class WriteCtx { @Override public String toString() { return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count - + " stableHow:" + stableHow + " replied:" + replied + " dataState:" - + dataState + " xid:" + xid; + + " originalCount:" + originalCount + " stableHow:" + stableHow + + " replied:" + replied + " dataState:" + dataState + " xid:" + xid; } } \ No newline at end of file Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Thu Oct 17 05:32:42 2013 @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; @@ -36,9 +37,11 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; +import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response; import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.util.Daemon; import org.jboss.netty.channel.Channel; @@ -66,8 +69,8 @@ public class WriteManager { */ private long streamTimeout; - public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second - public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second + public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes + public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { openFileMap.put(h, ctx); @@ -118,7 +121,8 @@ public class WriteManager { byte[] data = request.getData().array(); if (data.length < count) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( + new XDR(), xid, new VerifierNone()), xid); return; } @@ -155,7 +159,8 @@ public class WriteManager { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, count, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( + new XDR(), xid, new VerifierNone()), xid); return; } @@ -163,7 +168,7 @@ public class WriteManager { String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY, Nfs3Constant.FILE_DUMP_DIR_DEFAULT); openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" - + fileHandle.getFileId()); + + fileHandle.getFileId(), dfsClient, iug); addOpenFileStream(fileHandle, openFileCtx); if (LOG.isDebugEnabled()) { LOG.debug("opened stream for file:" + fileHandle.getFileId()); @@ -173,65 +178,55 @@ public class WriteManager { // Add write into the async job queue openFileCtx.receivedNewWrite(dfsClient, request, channel, xid, asyncDataService, iug); - // Block stable write - if (request.getStableHow() != WriteStableHow.UNSTABLE) { - if (handleCommit(fileHandle, offset + count)) { - Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug); - WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), - postOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, request.getStableHow(), - Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); - } else { - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); - } - } - return; } - boolean handleCommit(FileHandle fileHandle, long commitOffset) { + void handleCommit(DFSClient dfsClient, FileHandle fileHandle, + long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + int status; OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId() - + " commitOffset=" + commitOffset); - return true; - } - long timeout = 30 * 1000; // 30 seconds - long startCommit = System.currentTimeMillis(); - while (true) { - int ret = openFileCtx.checkCommit(commitOffset); - if (ret == OpenFileCtx.COMMIT_FINISHED) { - // Committed - return true; - } else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) { - LOG.info("Inactive stream, fileId=" + fileHandle.getFileId() - + " commitOffset=" + commitOffset); - return true; - } - assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR); - if (ret == OpenFileCtx.COMMIT_ERROR) { - return false; - } + + " commitOffset=" + commitOffset + ". Return success in this case."); + status = Nfs3Status.NFS3_OK; - if (LOG.isDebugEnabled()) { - LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId() - + " commitOffset=" + commitOffset); - } - if (System.currentTimeMillis() - startCommit > timeout) { - // Commit took too long, return error - return false; - } - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId() - + " commitOffset=" + commitOffset); - return false; + } else { + COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset, + channel, xid, preOpAttr); + switch (ret) { + case COMMIT_DO_SYNC: + case COMMIT_FINISHED: + case COMMIT_INACTIVE_CTX: + status = Nfs3Status.NFS3_OK; + break; + case COMMIT_INACTIVE_WITH_PENDING_WRITE: + case COMMIT_ERROR: + status = Nfs3Status.NFS3ERR_IO; + break; + case COMMIT_WAIT: + // Do nothing. Commit is async now. + return; + default: + throw new RuntimeException("Should not get commit return code:" + + ret.name()); } - }// while + } + + // Send out the response + Nfs3FileAttributes postOpAttr = null; + try { + String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileid()); + postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); + } catch (IOException e1) { + LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileid()); + } + WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr); + COMMIT3Response response = new COMMIT3Response(status, fileWcc, + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannelCommit(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); } /** Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java Thu Oct 17 05:32:42 2013 @@ -33,11 +33,13 @@ import org.apache.hadoop.nfs.nfs3.reques import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; import org.apache.hadoop.oncrpc.RegistrationClient; import org.apache.hadoop.oncrpc.RpcCall; -import org.apache.hadoop.oncrpc.RpcFrameDecoder; import org.apache.hadoop.oncrpc.RpcReply; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.SimpleTcpClient; import org.apache.hadoop.oncrpc.SimpleTcpClientHandler; import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.CredentialsNone; +import org.apache.hadoop.oncrpc.security.VerifierNone; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; @@ -58,15 +60,9 @@ public class TestOutOfOrderWrite { static XDR create() { XDR request = new XDR(); - RpcCall.write(request, 0x8000004c, Nfs3Constant.PROGRAM, - Nfs3Constant.VERSION, Nfs3Constant.NFSPROC3.CREATE.getValue()); - - // credentials - request.writeInt(0); // auth null - request.writeInt(0); // length zero - // verifier - request.writeInt(0); // auth null - request.writeInt(0); // length zero + RpcCall.getInstance(0x8000004c, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION, + Nfs3Constant.NFSPROC3.CREATE.getValue(), new CredentialsNone(), + new VerifierNone()).write(request); SetAttr3 objAttr = new SetAttr3(); CREATE3Request createReq = new CREATE3Request(new FileHandle("/"), @@ -78,15 +74,10 @@ public class TestOutOfOrderWrite { static XDR write(FileHandle handle, int xid, long offset, int count, byte[] data) { XDR request = new XDR(); - RpcCall.write(request, xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION, - Nfs3Constant.NFSPROC3.WRITE.getValue()); + RpcCall.getInstance(xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION, + Nfs3Constant.NFSPROC3.CREATE.getValue(), new CredentialsNone(), + new VerifierNone()).write(request); - // credentials - request.writeInt(0); // auth null - request.writeInt(0); // length zero - // verifier - request.writeInt(0); // auth null - request.writeInt(0); // length zero WRITE3Request write1 = new WRITE3Request(handle, offset, count, WriteStableHow.UNSTABLE, ByteBuffer.wrap(data)); write1.serialize(request); @@ -145,8 +136,9 @@ public class TestOutOfOrderWrite { protected ChannelPipelineFactory setPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler( - request)); + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), + new WriteHandler(request)); } }; return this.pipelineFactory; @@ -174,11 +166,11 @@ public class TestOutOfOrderWrite { XDR writeReq; writeReq = write(handle, 0x8000005c, 2000, 1000, data3); - Nfs3Utils.writeChannel(channel, writeReq); + Nfs3Utils.writeChannel(channel, writeReq, 1); writeReq = write(handle, 0x8000005d, 1000, 1000, data2); - Nfs3Utils.writeChannel(channel, writeReq); + Nfs3Utils.writeChannel(channel, writeReq, 2); writeReq = write(handle, 0x8000005e, 0, 1000, data1); - Nfs3Utils.writeChannel(channel, writeReq); + Nfs3Utils.writeChannel(channel, writeReq, 3); // TODO: convert to Junit test, and validate result automatically } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java Thu Oct 17 05:32:42 2013 @@ -26,6 +26,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co import org.apache.hadoop.oncrpc.RegistrationClient; import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.CredentialsNone; +import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapRequest; @@ -78,11 +80,8 @@ public class TestPortmapRegister { static void createPortmapXDRheader(XDR xdr_out, int procedure) { // TODO: Move this to RpcRequest - RpcCall.write(xdr_out, 0, 100000, 2, procedure); - xdr_out.writeInt(0); //no auth - xdr_out.writeInt(0); - xdr_out.writeInt(0); - xdr_out.writeInt(0); + RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(), + new VerifierNone()).write(xdr_out); /* xdr_out.putInt(1); //unix auth Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java Thu Oct 17 05:32:42 2013 @@ -27,6 +27,8 @@ import java.net.UnknownHostException; import org.apache.hadoop.nfs.nfs3.Nfs3Constant; import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.CredentialsNone; +import org.apache.hadoop.oncrpc.security.VerifierNone; // TODO: convert this to Junit public class TestUdpServer { @@ -82,7 +84,8 @@ public class TestUdpServer { static void createPortmapXDRheader(XDR xdr_out, int procedure) { // Make this a method - RpcCall.write(xdr_out, 0, 100000, 2, procedure); + RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(), + new VerifierNone()).write(xdr_out); } static void testGetportMount() { Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java Thu Oct 17 05:32:42 2013 @@ -17,41 +17,44 @@ */ package org.apache.hadoop.hdfs.nfs.nfs3; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSClient; import org.junit.Test; -import org.mockito.Mockito; public class TestDFSClientCache { @Test - public void testLruTable() throws IOException { - DFSClientCache cache = new DFSClientCache(new Configuration(), 3); - DFSClient client = Mockito.mock(DFSClient.class); - cache.put("a", client); - assertTrue(cache.containsKey("a")); - - cache.put("b", client); - cache.put("c", client); - cache.put("d", client); - assertTrue(cache.usedSize() == 3); - assertFalse(cache.containsKey("a")); - - // Cache should have d,c,b in LRU order - assertTrue(cache.containsKey("b")); - // Do a lookup to make b the most recently used - assertTrue(cache.get("b") != null); - - cache.put("e", client); - assertTrue(cache.usedSize() == 3); - // c should be replaced with e, and cache has e,b,d - assertFalse(cache.containsKey("c")); - assertTrue(cache.containsKey("e")); - assertTrue(cache.containsKey("b")); - assertTrue(cache.containsKey("d")); + public void testEviction() throws IOException { + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost"); + + // Only one entry will be in the cache + final int MAX_CACHE_SIZE = 2; + + DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); + + DFSClient c1 = cache.get("test1"); + assertTrue(cache.get("test1").toString().contains("ugi=test1")); + assertEquals(c1, cache.get("test1")); + assertFalse(isDfsClientClose(c1)); + + cache.get("test2"); + assertTrue(isDfsClientClose(c1)); + assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size()); + } + + private static boolean isDfsClientClose(DFSClient c) { + try { + c.exists(""); + } catch (IOException e) { + return e.getMessage().equals("Filesystem closed"); + } + return false; } } Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java?rev=1532967&r1=1532966&r2=1532967&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java Thu Oct 17 05:32:42 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -51,8 +52,9 @@ public class TestOffsetRange { OffsetRange r3 = new OffsetRange(1, 3); OffsetRange r4 = new OffsetRange(3, 4); - assertTrue(r2.compareTo(r3) == 0); - assertTrue(r2.compareTo(r1) == 1); - assertTrue(r2.compareTo(r4) == -1); + assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3)); + assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2)); + assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0); + assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0); } }