hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cur...@apache.org
Subject svn commit: r1619019 [2/11] - in /hadoop/common/branches/YARN-1051/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs-httpfs/src/main/ja...
Date Wed, 20 Aug 2014 01:34:47 GMT
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Wed Aug 20 01:34:29 2014
@@ -140,7 +140,7 @@ public class RpcProgramNfs3 extends RpcP
   public static final int DEFAULT_UMASK = 0022;
   public static final FsPermission umask = new FsPermission(
       (short) DEFAULT_UMASK);
-  
+
   static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
 
   private final NfsConfiguration config;
@@ -149,14 +149,14 @@ public class RpcProgramNfs3 extends RpcP
   private final DFSClientCache clientCache;
 
   private final NfsExports exports;
-  
+
   private final short replication;
   private final long blockSize;
   private final int bufferSize;
   private final boolean aixCompatMode;
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
-  
+
   private final RpcCallCache rpcCallCache;
 
   public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
@@ -166,11 +166,11 @@ public class RpcProgramNfs3 extends RpcP
         NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
         Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
         allowInsecurePorts);
-   
+
     this.config = config;
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup(config);
-    
+
     aixCompatMode = config.getBoolean(
         NfsConfigKeys.AIX_COMPAT_MODE_KEY,
         NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
@@ -184,7 +184,7 @@ public class RpcProgramNfs3 extends RpcP
     bufferSize = config.getInt(
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-    
+
     writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
         NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
     boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY,
@@ -216,12 +216,23 @@ public class RpcProgramNfs3 extends RpcP
       throw new IOException("Cannot create dump directory " + dumpDir);
     }
   }
-  
+
   @Override
   public void startDaemons() {
      writeManager.startAsyncDataSerivce();
   }
-  
+
+  // Checks the type of IOException and maps it to appropriate Nfs3Status code.
+  private int mapErrorStatus(IOException e) {
+    if (e instanceof FileNotFoundException) {
+      return Nfs3Status.NFS3ERR_STALE;
+    } else if (e instanceof AccessControlException) {
+      return Nfs3Status.NFS3ERR_ACCES;
+    } else {
+      return Nfs3Status.NFS3ERR_IO;
+    }
+  }
+
   /******************************************************
    * RPC call handlers
    ******************************************************/
@@ -236,20 +247,25 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public GETATTR3Response getattr(XDR xdr, RpcInfo info) {
+    return getattr(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     GETATTR3Request request = null;
     try {
       request = new GETATTR3Request(xdr);
@@ -280,7 +296,8 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.info("Can't get file attribute, fileId=" + handle.getFileId(), e);
-      response.setStatus(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      response.setStatus(status);
       return response;
     }
     if (attrs == null) {
@@ -297,7 +314,7 @@ public class RpcProgramNfs3 extends RpcP
   private void setattrInternal(DFSClient dfsClient, String fileIdPath,
       SetAttr3 newAttr, boolean setMode) throws IOException {
     EnumSet<SetAttrField> updateFields = newAttr.getUpdateFields();
-    
+
     if (setMode && updateFields.contains(SetAttrField.MODE)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("set new mode:" + newAttr.getMode());
@@ -328,14 +345,19 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public SETATTR3Response setattr(XDR xdr, RpcInfo info) {
+    return setattr(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     SETATTR3Request request = null;
     try {
       request = new SETATTR3Request(xdr);
@@ -373,9 +395,9 @@ public class RpcProgramNfs3 extends RpcP
           return new SETATTR3Response(Nfs3Status.NFS3ERR_NOT_SYNC, wccData);
         }
       }
-      
+
       // check the write access privilege
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             preOpWcc, preOpAttr));
       }
@@ -394,30 +416,33 @@ public class RpcProgramNfs3 extends RpcP
       } catch (IOException e1) {
         LOG.info("Can't get postOpAttr for fileIdPath: " + fileIdPath, e1);
       }
-      if (e instanceof AccessControlException) {
-        return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, wccData);
-      } else {
-        return new SETATTR3Response(Nfs3Status.NFS3ERR_IO, wccData);
-      }
+
+      int status = mapErrorStatus(e);
+      return new SETATTR3Response(status, wccData);
     }
   }
 
   @Override
   public LOOKUP3Response lookup(XDR xdr, RpcInfo info) {
+    return lookup(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     LOOKUP3Request request = null;
     try {
       request = new LOOKUP3Request(xdr);
@@ -460,26 +485,32 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new LOOKUP3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new LOOKUP3Response(status);
     }
   }
-  
+
   @Override
   public ACCESS3Response access(XDR xdr, RpcInfo info) {
+    return access(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     ACCESS3Request request = null;
     try {
       request = new ACCESS3Request(xdr);
@@ -493,7 +524,7 @@ public class RpcProgramNfs3 extends RpcP
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS ACCESS fileId: " + handle.getFileId());
-    } 
+    }
 
     try {
       // HDFS-5804 removed supserUserClient access
@@ -506,7 +537,7 @@ public class RpcProgramNfs3 extends RpcP
       int access = Nfs3Utils.getAccessRightsForUserGroup(
           securityHandler.getUid(), securityHandler.getGid(),
           securityHandler.getAuxGids(), attrs);
-      
+
       return new ACCESS3Response(Nfs3Status.NFS3_OK, attrs, access);
     } catch (RemoteException r) {
       LOG.warn("Exception ", r);
@@ -521,20 +552,26 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new ACCESS3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new ACCESS3Response(status);
     }
   }
 
   @Override
   public READLINK3Response readlink(XDR xdr, RpcInfo info) {
+    return readlink(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
 
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -588,39 +625,33 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Readlink error: " + e.getClass(), e);
-      if (e instanceof FileNotFoundException) {
-        return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
-      } else if (e instanceof AccessControlException) {
-        return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES);
-      }
-      return new READLINK3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READLINK3Response(status);
     }
   }
 
   @Override
   public READ3Response read(XDR xdr, RpcInfo info) {
-    SecurityHandler securityHandler = getSecurityHandler(info);
-    SocketAddress remoteAddress = info.remoteAddress();
-    return read(xdr, securityHandler, remoteAddress);
+    return read(xdr, getSecurityHandler(info), info.remoteAddress());
   }
-  
+
   @VisibleForTesting
   READ3Response read(XDR xdr, SecurityHandler securityHandler,
       SocketAddress remoteAddress) {
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
     final String userName = securityHandler.getUser();
-    
+
     if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(userName);
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     READ3Request request = null;
 
     try {
@@ -670,7 +701,7 @@ public class RpcProgramNfs3 extends RpcP
         return new READ3Response(Nfs3Status.NFS3ERR_ACCES);
       }
     }
-    
+
     // In case there is buffered data for the same file, flush it. This can be
     // optimized later by reading from the cache.
     int ret = writeManager.commitBeforeRead(dfsClient, handle, offset + count);
@@ -693,6 +724,10 @@ public class RpcProgramNfs3 extends RpcP
         FSDataInputStream fis = clientCache.getDfsInputStream(userName,
             Nfs3Utils.getFileIdPath(handle));
 
+        if (fis == null) {
+            return new READ3Response(Nfs3Status.NFS3ERR_ACCES);
+        }
+
         try {
           readCount = fis.read(offset, readbuffer, 0, count);
         } catch (IOException e) {
@@ -725,7 +760,8 @@ public class RpcProgramNfs3 extends RpcP
     } catch (IOException e) {
       LOG.warn("Read error: " + e.getClass() + " offset: " + offset
           + " count: " + count, e);
-      return new READ3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READ3Response(status);
     }
   }
 
@@ -737,7 +773,7 @@ public class RpcProgramNfs3 extends RpcP
     SocketAddress remoteAddress = info.remoteAddress();
     return write(xdr, info.channel(), xid, securityHandler, remoteAddress);
   }
-  
+
   @VisibleForTesting
   WRITE3Response write(XDR xdr, Channel channel, int xid,
       SecurityHandler securityHandler, SocketAddress remoteAddress) {
@@ -748,7 +784,7 @@ public class RpcProgramNfs3 extends RpcP
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     WRITE3Request request = null;
 
     try {
@@ -781,13 +817,13 @@ public class RpcProgramNfs3 extends RpcP
         LOG.error("Can't get path for fileId:" + handle.getFileId());
         return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
             Nfs3Constant.WRITE_COMMIT_VERF);
       }
-      
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("requesed offset=" + offset + " and current filesize="
             + preOpAttr.getSize());
@@ -807,8 +843,10 @@ public class RpcProgramNfs3 extends RpcP
       }
       WccAttr attr = preOpAttr == null ? null : Nfs3Utils.getWccAttr(preOpAttr);
       WccData fileWcc = new WccData(attr, postOpAttr);
-      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0,
-          request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+
+      int status = mapErrorStatus(e);
+      return new WRITE3Response(status, fileWcc, 0, request.getStableHow(),
+          Nfs3Constant.WRITE_COMMIT_VERF);
     }
 
     return null;
@@ -816,11 +854,9 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public CREATE3Response create(XDR xdr, RpcInfo info) {
-    SecurityHandler securityHandler = getSecurityHandler(info);
-    SocketAddress remoteAddress = info.remoteAddress();
-    return create(xdr, securityHandler, remoteAddress);
+    return create(xdr, getSecurityHandler(info), info.remoteAddress());
   }
-  
+
   @VisibleForTesting
   CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
       SocketAddress remoteAddress) {
@@ -830,7 +866,7 @@ public class RpcProgramNfs3 extends RpcP
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     CREATE3Request request = null;
 
     try {
@@ -868,7 +904,7 @@ public class RpcProgramNfs3 extends RpcP
         LOG.error("Can't get path for dirHandle:" + dirHandle);
         return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
             preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
@@ -881,15 +917,15 @@ public class RpcProgramNfs3 extends RpcP
       FsPermission permission = setAttr3.getUpdateFields().contains(
           SetAttrField.MODE) ? new FsPermission((short) setAttr3.getMode())
           : FsPermission.getDefault().applyUMask(umask);
-          
+
       EnumSet<CreateFlag> flag = (createMode != Nfs3Constant.CREATE_EXCLUSIVE) ? 
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : 
           EnumSet.of(CreateFlag.CREATE);
-      
+
       fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission,
           flag, false, replication, blockSize, null, bufferSize, null),
           statistics);
-      
+
       if ((createMode == Nfs3Constant.CREATE_UNCHECKED)
           || (createMode == Nfs3Constant.CREATE_GUARDED)) {
         // Set group if it's not specified in the request.
@@ -903,7 +939,7 @@ public class RpcProgramNfs3 extends RpcP
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           dfsClient, dirFileIdPath, iug);
-      
+
       // Add open stream
       OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
           writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
@@ -920,7 +956,7 @@ public class RpcProgramNfs3 extends RpcP
               + fileHandle.getFileId());
         }
       }
-      
+
     } catch (IOException e) {
       LOG.error("Exception", e);
       if (fos != null) {
@@ -940,29 +976,30 @@ public class RpcProgramNfs3 extends RpcP
               + dirHandle.getFileId(), e1);
         }
       }
-      if (e instanceof AccessControlException) {
-        return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, fileHandle,
-            postOpObjAttr, dirWcc);
-      } else {
-        return new CREATE3Response(Nfs3Status.NFS3ERR_IO, fileHandle,
-            postOpObjAttr, dirWcc);
-      }
+
+      int status = mapErrorStatus(e);
+      return new CREATE3Response(status, fileHandle, postOpObjAttr, dirWcc);
     }
-    
+
     return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
         dirWcc);
   }
 
   @Override
   public MKDIR3Response mkdir(XDR xdr, RpcInfo info) {
+    return mkdir(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     MKDIR3Request request = null;
 
     try {
@@ -992,11 +1029,11 @@ public class RpcProgramNfs3 extends RpcP
         return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
 
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
             new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
       }
-      
+
       final String fileIdPath = dirFileIdPath + "/" + fileName;
       SetAttr3 setAttr3 = request.getObjAttr();
       FsPermission permission = setAttr3.getUpdateFields().contains(
@@ -1015,7 +1052,7 @@ public class RpcProgramNfs3 extends RpcP
         setAttr3.setGid(securityHandler.getGid());
       }
       setattrInternal(dfsClient, fileIdPath, setAttr3, false);
-      
+
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       objFileHandle = new FileHandle(postOpObjAttr.getFileId());
       WccData dirWcc = Nfs3Utils.createWccData(
@@ -1032,15 +1069,11 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new MKDIR3Response(Nfs3Status.NFS3ERR_PERM, objFileHandle,
-            postOpObjAttr, dirWcc);
-      } else {
-        return new MKDIR3Response(Nfs3Status.NFS3ERR_IO, objFileHandle,
-            postOpObjAttr, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new MKDIR3Response(status, objFileHandle, postOpObjAttr, dirWcc);
     }
   }
 
@@ -1048,21 +1081,22 @@ public class RpcProgramNfs3 extends RpcP
   public READDIR3Response mknod(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
-  
+
   @Override
   public REMOVE3Response remove(XDR xdr, RpcInfo info) {
     return remove(xdr, getSecurityHandler(info), info.remoteAddress());
   }
- 
+
   @VisibleForTesting
-  REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) {
+  REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     REMOVE3Request request = null;
     try {
       request = new REMOVE3Request(xdr);
@@ -1120,26 +1154,29 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e1);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new REMOVE3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
-      } else {
-        return new REMOVE3Response(Nfs3Status.NFS3ERR_IO, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new REMOVE3Response(status, dirWcc);
     }
   }
 
   @Override
   public RMDIR3Response rmdir(XDR xdr, RpcInfo info) {
+    return rmdir(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     RMDIR3Request request = null;
     try {
       request = new RMDIR3Request(xdr);
@@ -1164,10 +1201,10 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
         return new RMDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           preOpDirAttr);
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); 
       }
 
@@ -1179,7 +1216,7 @@ public class RpcProgramNfs3 extends RpcP
       if (!fstat.isDir()) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTDIR, errWcc);
       }
-      
+
       if (fstat.getChildrenNum() > 0) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTEMPTY, errWcc);
       }
@@ -1202,26 +1239,29 @@ public class RpcProgramNfs3 extends RpcP
           LOG.info("Can't get postOpDirAttr for " + dirFileIdPath, e1);
         }
       }
+
       WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           postOpDirAttr);
-      if (e instanceof AccessControlException) {
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
-      } else {
-        return new RMDIR3Response(Nfs3Status.NFS3ERR_IO, dirWcc);
-      }
+      int status = mapErrorStatus(e);
+      return new RMDIR3Response(status, dirWcc);
     }
   }
 
   @Override
   public RENAME3Response rename(XDR xdr, RpcInfo info) {
+    return rename(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     RENAME3Request request = null;
     try {
       request = new RENAME3Request(xdr);
@@ -1258,8 +1298,8 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for toHandle fileId:" + toHandle.getFileId());
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
             fromPreOpAttr);
         WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
@@ -1280,7 +1320,7 @@ public class RpcProgramNfs3 extends RpcP
       return new RENAME3Response(Nfs3Status.NFS3_OK, fromDirWcc, toDirWcc);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      // Try to return correct WccData      
+      // Try to return correct WccData
       try {
         fromDirWcc = Nfs3Utils.createWccData(
             Nfs3Utils.getWccAttr(fromPreOpAttr), dfsClient, fromDirFileIdPath,
@@ -1291,25 +1331,27 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get postOpDirAttr for " + fromDirFileIdPath + " or"
             + toDirFileIdPath, e1);
       }
-      if (e instanceof AccessControlException) {
-        return new RENAME3Response(Nfs3Status.NFS3ERR_PERM, fromDirWcc,
-            toDirWcc);
-      } else {
-        return new RENAME3Response(Nfs3Status.NFS3ERR_IO, fromDirWcc, toDirWcc);
-      }
+
+      int status = mapErrorStatus(e);
+      return new RENAME3Response(status, fromDirWcc, toDirWcc);
     }
   }
 
   @Override
   public SYMLINK3Response symlink(XDR xdr, RpcInfo info) {
+    return symlink(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
 
-    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1355,7 +1397,8 @@ public class RpcProgramNfs3 extends RpcP
 
     } catch (IOException e) {
       LOG.warn("Exception:" + e);
-      response.setStatus(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      response.setStatus(status);
       return response;
     }
   }
@@ -1387,28 +1430,27 @@ public class RpcProgramNfs3 extends RpcP
     }
     return dlisting;
   }
-  
+
   @Override
   public READDIR3Response readdir(XDR xdr, RpcInfo info) {
-    SecurityHandler securityHandler = getSecurityHandler(info);
-    SocketAddress remoteAddress = info.remoteAddress();
-    return readdir(xdr, securityHandler, remoteAddress);
+    return readdir(xdr, getSecurityHandler(info), info.remoteAddress());
   }
+
   public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
       SocketAddress remoteAddress) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
-    
+
     if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     READDIR3Request request = null;
     try {
       request = new READDIR3Request(xdr);
@@ -1427,7 +1469,7 @@ public class RpcProgramNfs3 extends RpcP
       LOG.info("Nonpositive count in invalid READDIR request:" + count);
       return new READDIR3Response(Nfs3Status.NFS3_OK);
     }
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIR fileId: " + handle.getFileId() + " cookie: "
           + cookie + " count: " + count);
@@ -1492,7 +1534,7 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      
+
       dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpAttr == null) {
@@ -1501,21 +1543,22 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new READDIR3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READDIR3Response(status);
     }
 
     /**
      * Set up the dirents in the response. fileId is used as the cookie with one
      * exception. Linux client can either be stuck with "ls" command (on REHL)
      * or report "Too many levels of symbolic links" (Ubuntu).
-     * 
+     *
      * The problem is that, only two items returned, "." and ".." when the
      * namespace is empty. Both of them are "/" with the same cookie(root
      * fileId). Linux client doesn't think such a directory is a real directory.
      * Even though NFS protocol specifies cookie is an opaque data, Linux client
      * somehow doesn't like an empty dir returns same cookie for both "." and
      * "..".
-     * 
+     *
      * The workaround is to use 0 as the cookie for "." and always return "." as
      * the first entry in readdir/readdirplus response.
      */
@@ -1523,7 +1566,7 @@ public class RpcProgramNfs3 extends RpcP
     int n = (int) Math.min(fstatus.length, count-2);
     boolean eof = (n < fstatus.length) ? false : (dlisting
         .getRemainingEntries() == 0);
-    
+
     Entry3[] entries;
     if (cookie == 0) {
       entries = new Entry3[n + 2];
@@ -1543,7 +1586,7 @@ public class RpcProgramNfs3 extends RpcP
             fstatus[i].getLocalName(), fstatus[i].getFileId());
       }
     }
-    
+
     DirList3 dirList = new READDIR3Response.DirList3(entries, eof);
     return new READDIR3Response(Nfs3Status.NFS3_OK, postOpAttr,
         dirStatus.getModificationTime(), dirList);
@@ -1551,9 +1594,7 @@ public class RpcProgramNfs3 extends RpcP
 
   @Override
   public READDIRPLUS3Response readdirplus(XDR xdr, RpcInfo info) {
-    SecurityHandler securityHandler = getSecurityHandler(info);
-    SocketAddress remoteAddress = info.remoteAddress();
-    return readdirplus(xdr, securityHandler, remoteAddress);
+    return readdirplus(xdr, getSecurityHandler(info), info.remoteAddress());
   }
 
   @VisibleForTesting
@@ -1562,12 +1603,12 @@ public class RpcProgramNfs3 extends RpcP
     if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
     }
-    
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
     }
-    
+
     READDIRPLUS3Request request = null;
     try {
       request = new READDIRPLUS3Request(xdr);
@@ -1592,7 +1633,7 @@ public class RpcProgramNfs3 extends RpcP
       LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount);
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
     }
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
           + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount);
@@ -1655,7 +1696,7 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      
+
       dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpDirAttr == null) {
@@ -1664,19 +1705,20 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new READDIRPLUS3Response(status);
     }
-    
+
     // Set up the dirents in the response
     HdfsFileStatus[] fstatus = dlisting.getPartialListing();
     int n = (int) Math.min(fstatus.length, dirCount-2);
     boolean eof = (n < fstatus.length) ? false : (dlisting
         .getRemainingEntries() == 0);
-    
+
     READDIRPLUS3Response.EntryPlus3[] entries;
     if (cookie == 0) {
       entries = new READDIRPLUS3Response.EntryPlus3[n+2];
-      
+
       entries[0] = new READDIRPLUS3Response.EntryPlus3(
           postOpDirAttr.getFileId(), ".", 0, postOpDirAttr, new FileHandle(
               postOpDirAttr.getFileId()));
@@ -1720,23 +1762,28 @@ public class RpcProgramNfs3 extends RpcP
     return new READDIRPLUS3Response(Nfs3Status.NFS3_OK, postOpDirAttr,
         dirStatus.getModificationTime(), dirListPlus);
   }
-  
+
   @Override
   public FSSTAT3Response fsstat(XDR xdr, RpcInfo info) {
+    return fsstat(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     FSSTAT3Request request = null;
     try {
       request = new FSSTAT3Request(xdr);
@@ -1754,14 +1801,14 @@ public class RpcProgramNfs3 extends RpcP
       FsStatus fsStatus = dfsClient.getDiskStatus();
       long totalBytes = fsStatus.getCapacity();
       long freeBytes = fsStatus.getRemaining();
-      
+
       Nfs3FileAttributes attrs = writeManager.getFileAttr(dfsClient, handle,
           iug);
       if (attrs == null) {
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new FSSTAT3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       long maxFsObjects = config.getLong("dfs.max.objects", 0);
       if (maxFsObjects == 0) {
         // A value of zero in HDFS indicates no limit to the number
@@ -1769,7 +1816,7 @@ public class RpcProgramNfs3 extends RpcP
         // Long.MAX_VALUE so 32bit client won't complain.
         maxFsObjects = Integer.MAX_VALUE;
       }
-      
+
       return new FSSTAT3Response(Nfs3Status.NFS3_OK, attrs, totalBytes,
           freeBytes, freeBytes, maxFsObjects, maxFsObjects, maxFsObjects, 0);
     } catch (RemoteException r) {
@@ -1785,26 +1832,32 @@ public class RpcProgramNfs3 extends RpcP
       }
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new FSSTAT3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new FSSTAT3Response(status);
     }
   }
 
   @Override
   public FSINFO3Response fsinfo(XDR xdr, RpcInfo info) {
+    return fsinfo(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     FSINFO3Request request = null;
     try {
       request = new FSINFO3Request(xdr);
@@ -1835,7 +1888,7 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new FSINFO3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
+
       int fsProperty = Nfs3Constant.FSF3_CANSETTIME
           | Nfs3Constant.FSF3_HOMOGENEOUS;
 
@@ -1843,26 +1896,32 @@ public class RpcProgramNfs3 extends RpcP
           wtmax, wtmax, 1, dtperf, Long.MAX_VALUE, new NfsTime(1), fsProperty);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new FSINFO3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new FSINFO3Response(status);
     }
   }
 
   @Override
   public PATHCONF3Response pathconf(XDR xdr, RpcInfo info) {
+    return pathconf(xdr, getSecurityHandler(info), info.remoteAddress());
+  }
+
+  @VisibleForTesting
+  PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
-    
-    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
+
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
-    
-    SecurityHandler securityHandler = getSecurityHandler(info);
+
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     PATHCONF3Request request = null;
     try {
       request = new PATHCONF3Request(xdr);
@@ -1890,22 +1949,30 @@ public class RpcProgramNfs3 extends RpcP
           HdfsConstants.MAX_PATH_LENGTH, true, false, false, true);
     } catch (IOException e) {
       LOG.warn("Exception ", e);
-      return new PATHCONF3Response(Nfs3Status.NFS3ERR_IO);
+      int status = mapErrorStatus(e);
+      return new PATHCONF3Response(status);
     }
   }
 
   @Override
   public COMMIT3Response commit(XDR xdr, RpcInfo info) {
-    //Channel channel, int xid,
-    //    SecurityHandler securityHandler, InetAddress client) {
-    COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     SecurityHandler securityHandler = getSecurityHandler(info);
+    RpcCall rpcCall = (RpcCall) info.header();
+    int xid = rpcCall.getXid();
+    SocketAddress remoteAddress = info.remoteAddress();
+    return commit(xdr, info.channel(), xid, securityHandler, remoteAddress);
+  }
+
+  @VisibleForTesting
+  COMMIT3Response commit(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, SocketAddress remoteAddress) {
+    COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       return response;
     }
-    
+
     COMMIT3Request request = null;
     try {
       request = new COMMIT3Request(xdr);
@@ -1929,21 +1996,19 @@ public class RpcProgramNfs3 extends RpcP
         LOG.info("Can't get path for fileId:" + handle.getFileId());
         return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
       }
-      
-      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
+
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
             Nfs3Constant.WRITE_COMMIT_VERF);
       }
-      
+
       long commitOffset = (request.getCount() == 0) ? 0
           : (request.getOffset() + request.getCount());
-      
+
       // Insert commit as an async request
-      RpcCall rpcCall = (RpcCall) info.header();
-      int xid = rpcCall.getXid();
       writeManager.handleCommit(dfsClient, handle, commitOffset,
-          info.channel(), xid, preOpAttr);
+          channel, xid, preOpAttr);
       return null;
     } catch (IOException e) {
       LOG.warn("Exception ", e);
@@ -1953,9 +2018,11 @@ public class RpcProgramNfs3 extends RpcP
       } catch (IOException e1) {
         LOG.info("Can't get postOpAttr for fileId: " + handle.getFileId(), e1);
       }
+
       WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
-      return new COMMIT3Response(Nfs3Status.NFS3ERR_IO, fileWcc,
-          Nfs3Constant.WRITE_COMMIT_VERF);
+      int status = mapErrorStatus(e);
+      return new COMMIT3Response(status, fileWcc,
+        Nfs3Constant.WRITE_COMMIT_VERF);
     }
   }
 
@@ -1973,7 +2040,7 @@ public class RpcProgramNfs3 extends RpcP
     RpcCall rpcCall = (RpcCall) info.header();
     return getSecurityHandler(rpcCall.getCredential(), rpcCall.getVerifier());
   }
-  
+
   @Override
   public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
     RpcCall rpcCall = (RpcCall) info.header();
@@ -1986,7 +2053,7 @@ public class RpcProgramNfs3 extends RpcP
     InetAddress client = ((InetSocketAddress) info.remoteAddress())
         .getAddress();
     Credentials credentials = rpcCall.getCredential();
-    
+
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
       if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
@@ -2023,7 +2090,7 @@ public class RpcProgramNfs3 extends RpcP
         }
       }
     }
-    
+
     NFS3Response response = null;
     if (nfsproc3 == NFSPROC3.NULL) {
       response = nullProcedure();
@@ -2040,7 +2107,7 @@ public class RpcProgramNfs3 extends RpcP
     } else if (nfsproc3 == NFSPROC3.READ) {
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.READ_RPC_START + xid);
-      }    
+      }
       response = read(xdr, info);
       if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
         LOG.debug(Nfs3Utils.READ_RPC_END + xid);
@@ -2053,7 +2120,7 @@ public class RpcProgramNfs3 extends RpcP
       // Write end debug trace is in Nfs3Utils.writeChannel
     } else if (nfsproc3 == NFSPROC3.CREATE) {
       response = create(xdr, info);
-    } else if (nfsproc3 == NFSPROC3.MKDIR) {      
+    } else if (nfsproc3 == NFSPROC3.MKDIR) {
       response = mkdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
       response = symlink(xdr, info);
@@ -2104,18 +2171,12 @@ public class RpcProgramNfs3 extends RpcP
 
     RpcUtil.sendRpcResponse(ctx, rsp);
   }
-  
+
   @Override
   protected boolean isIdempotent(RpcCall call) {
-    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure()); 
+    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure());
     return nfsproc3 == null || nfsproc3.isIdempotent();
   }
-  
-  private boolean checkAccessPrivilege(RpcInfo info,
-      final AccessPrivilege expected) {
-    SocketAddress remoteAddress = info.remoteAddress();
-    return checkAccessPrivilege(remoteAddress, expected);
-  }
 
   private boolean checkAccessPrivilege(SocketAddress remoteAddress,
       final AccessPrivilege expected) {
@@ -2139,7 +2200,7 @@ public class RpcProgramNfs3 extends RpcP
     }
     return true;
   }
-  
+
   @VisibleForTesting
   WriteManager getWriteManager() {
     return this.writeManager;

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java Wed Aug 20 01:34:29 2014
@@ -18,19 +18,601 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import org.jboss.netty.channel.Channel;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
+import org.apache.hadoop.nfs.nfs3.Nfs3Status;
+import org.apache.hadoop.nfs.nfs3.request.LOOKUP3Request;
+import org.apache.hadoop.nfs.nfs3.request.READ3Request;
+import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.ACCESS3Response;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
+import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
+import org.apache.hadoop.nfs.nfs3.response.FSSTAT3Response;
+import org.apache.hadoop.nfs.nfs3.response.FSINFO3Response;
+import org.apache.hadoop.nfs.nfs3.response.GETATTR3Response;
+import org.apache.hadoop.nfs.nfs3.response.LOOKUP3Response;
+import org.apache.hadoop.nfs.nfs3.response.PATHCONF3Response;
+import org.apache.hadoop.nfs.nfs3.response.READ3Response;
+import org.apache.hadoop.nfs.nfs3.response.REMOVE3Response;
+import org.apache.hadoop.nfs.nfs3.response.RMDIR3Response;
+import org.apache.hadoop.nfs.nfs3.response.RENAME3Response;
+import org.apache.hadoop.nfs.nfs3.response.READDIR3Response;
+import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response;
+import org.apache.hadoop.nfs.nfs3.response.READLINK3Response;
+import org.apache.hadoop.nfs.nfs3.response.SETATTR3Response;
+import org.apache.hadoop.nfs.nfs3.response.SYMLINK3Response;
+import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
+import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
+import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.SecurityHandler;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
+import org.apache.hadoop.security.authorize.ProxyUsers;
 
 
 /**
  * Tests for {@link RpcProgramNfs3}
  */
 public class TestRpcProgramNfs3 {
+  static DistributedFileSystem hdfs;
+  static MiniDFSCluster cluster = null;
+  static NfsConfiguration config = new NfsConfiguration();
+  static NameNode nn;
+  static Nfs3 nfs;
+  static RpcProgramNfs3 nfsd;
+  static SecurityHandler securityHandler;
+  static SecurityHandler securityHandlerUnpriviledged;
+  static String testdir = "/tmp";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    String currentUser = System.getProperty("user.name");
+
+    config.set("fs.permissions.umask-mode", "u=rwx,g=,o=");
+    config.set(DefaultImpersonationProvider.getTestProvider()
+        .getProxySuperuserGroupConfKey(currentUser), "*");
+    config.set(DefaultImpersonationProvider.getTestProvider()
+        .getProxySuperuserIpConfKey(currentUser), "*");
+    ProxyUsers.refreshSuperUserGroupsConfiguration(config);
+
+    cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+    cluster.waitActive();
+    hdfs = cluster.getFileSystem();
+    nn = cluster.getNameNode();
+
+    // Use ephemeral ports in case tests are running in parallel
+    config.setInt("nfs3.mountd.port", 0);
+    config.setInt("nfs3.server.port", 0);
+
+    // Start NFS with allowed.hosts set to "* rw"
+    config.set("dfs.nfs.exports.allowed.hosts", "* rw");
+    nfs = new Nfs3(config);
+    nfs.startServiceInternal(false);
+    nfsd = (RpcProgramNfs3) nfs.getRpcProgram();
+
+
+    // Mock SecurityHandler which returns system user.name
+    securityHandler = Mockito.mock(SecurityHandler.class);
+    Mockito.when(securityHandler.getUser()).thenReturn(currentUser);
+
+    // Mock SecurityHandler which returns a dummy username "harry"
+    securityHandlerUnpriviledged = Mockito.mock(SecurityHandler.class);
+    Mockito.when(securityHandlerUnpriviledged.getUser()).thenReturn("harry");
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void createFiles() throws IllegalArgumentException, IOException {
+    hdfs.delete(new Path(testdir), true);
+    hdfs.mkdirs(new Path(testdir));
+    hdfs.mkdirs(new Path(testdir + "/foo"));
+    DFSTestUtil.createFile(hdfs, new Path(testdir + "/bar"), 0, (short) 1, 0);
+  }
+
+  @Test(timeout = 60000)
+  public void testGetattr() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    GETATTR3Response response1 = nfsd.getattr(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    GETATTR3Response response2 = nfsd.getattr(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testSetattr() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("bar");
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+    xdr_req.writeBoolean(false);
+
+    // Attempt by an unpriviledged user should fail.
+    SETATTR3Response response1 = nfsd.setattr(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    SETATTR3Response response2 = nfsd.setattr(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testLookup() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    LOOKUP3Request lookupReq = new LOOKUP3Request(handle, "bar");
+    XDR xdr_req = new XDR();
+    lookupReq.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    LOOKUP3Response response1 = nfsd.lookup(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    LOOKUP3Response response2 = nfsd.lookup(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testAccess() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    ACCESS3Response response1 = nfsd.access(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    ACCESS3Response response2 = nfsd.access(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testReadlink() throws Exception {
+    // Create a symlink first.
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+    xdr_req.writeString("bar");
+
+    SYMLINK3Response response = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response.getStatus());
+
+    // Now perform readlink operations.
+    FileHandle handle2 = response.getObjFileHandle();
+    XDR xdr_req2 = new XDR();
+    handle2.serialize(xdr_req2);
+
+    // Attempt by an unpriviledged user should fail.
+    READLINK3Response response1 = nfsd.readlink(xdr_req2.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    READLINK3Response response2 = nfsd.readlink(xdr_req2.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testRead() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+
+    READ3Request readReq = new READ3Request(handle, 0, 5);
+    XDR xdr_req = new XDR();
+    readReq.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    READ3Response response1 = nfsd.read(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    READ3Response response2 = nfsd.read(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testWrite() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+
+    byte[] buffer = new byte[10];
+    for (int i = 0; i < 10; i++) {
+      buffer[i] = (byte) i;
+    }
+
+    WRITE3Request writeReq = new WRITE3Request(handle, 0, 10,
+        WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
+    XDR xdr_req = new XDR();
+    writeReq.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    WRITE3Response response1 = nfsd.write(xdr_req.asReadOnlyWrap(),
+        null, 1, securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    WRITE3Response response2 = nfsd.write(xdr_req.asReadOnlyWrap(),
+        null, 1, securityHandler,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect response:", null, response2);
+  }
+
+  @Test(timeout = 60000)
+  public void testCreate() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+    xdr_req.writeInt(Nfs3Constant.CREATE_UNCHECKED);
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    CREATE3Response response1 = nfsd.create(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    CREATE3Response response2 = nfsd.create(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testMkdir() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+    xdr_req.writeString("bar");
+
+    // Attempt to remove by an unpriviledged user should fail.
+    SYMLINK3Response response1 = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt to remove by a priviledged user should pass.
+    SYMLINK3Response response2 = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testSymlink() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+    SetAttr3 symAttr = new SetAttr3();
+    symAttr.serialize(xdr_req);
+    xdr_req.writeString("bar");
+
+    // Attempt by an unpriviledged user should fail.
+    SYMLINK3Response response1 = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    SYMLINK3Response response2 = nfsd.symlink(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testRemove() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("bar");
+
+    // Attempt by an unpriviledged user should fail.
+    REMOVE3Response response1 = nfsd.remove(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    REMOVE3Response response2 = nfsd.remove(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testRmdir() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("foo");
+
+    // Attempt by an unpriviledged user should fail.
+    RMDIR3Response response1 = nfsd.rmdir(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    RMDIR3Response response2 = nfsd.rmdir(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testRename() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeString("bar");
+    handle.serialize(xdr_req);
+    xdr_req.writeString("fubar");
+
+    // Attempt by an unpriviledged user should fail.
+    RENAME3Response response1 = nfsd.rename(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    RENAME3Response response2 = nfsd.rename(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testReaddir() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeInt(100);
+
+    // Attempt by an unpriviledged user should fail.
+    READDIR3Response response1 = nfsd.readdir(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    READDIR3Response response2 = nfsd.readdir(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testReaddirplus() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeInt(3);
+    xdr_req.writeInt(2);
+
+    // Attempt by an unpriviledged user should fail.
+    READDIRPLUS3Response response1 = nfsd.readdirplus(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    READDIRPLUS3Response response2 = nfsd.readdirplus(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testFsstat() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    FSSTAT3Response response1 = nfsd.fsstat(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    FSSTAT3Response response2 = nfsd.fsstat(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testFsinfo() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    FSINFO3Response response1 = nfsd.fsinfo(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    FSINFO3Response response2 = nfsd.fsinfo(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testPathconf() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+
+    // Attempt by an unpriviledged user should fail.
+    PATHCONF3Response response1 = nfsd.pathconf(xdr_req.asReadOnlyWrap(),
+        securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    PATHCONF3Response response2 = nfsd.pathconf(xdr_req.asReadOnlyWrap(),
+        securityHandler, new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3_OK,
+        response2.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testCommit() throws Exception {
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
+    long dirId = status.getFileId();
+    FileHandle handle = new FileHandle(dirId);
+    XDR xdr_req = new XDR();
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(0);
+    xdr_req.writeInt(5);
+
+    Channel ch = Mockito.mock(Channel.class);
+
+    // Attempt by an unpriviledged user should fail.
+    COMMIT3Response response1 = nfsd.commit(xdr_req.asReadOnlyWrap(),
+        ch, 1, securityHandlerUnpriviledged,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES,
+        response1.getStatus());
+
+    // Attempt by a priviledged user should pass.
+    COMMIT3Response response2 = nfsd.commit(xdr_req.asReadOnlyWrap(),
+        ch, 1, securityHandler,
+        new InetSocketAddress("localhost", 1234));
+    assertEquals("Incorrect COMMIT3Response:", null, response2);
+  }
+
   @Test(timeout=1000)
   public void testIdempotent() {
     Object[][] procedures = {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Wed Aug 20 01:34:29 2014
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentNa
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
 import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
 import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
@@ -407,4 +408,80 @@ public class TestWrites {
       }
     }
   }
+
+  @Test
+  public void testOOOWrites() throws IOException, InterruptedException {
+    NfsConfiguration config = new NfsConfiguration();
+    MiniDFSCluster cluster = null;
+    RpcProgramNfs3 nfsd;
+    final int bufSize = 32;
+    final int numOOO = 3;
+    SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
+    Mockito.when(securityHandler.getUser()).thenReturn(
+        System.getProperty("user.name"));
+    String currentUser = System.getProperty("user.name");
+    config.set(
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(currentUser),
+        "*");
+    config.set(
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(currentUser),
+        "*");
+    ProxyUsers.refreshSuperUserGroupsConfiguration(config);
+    // Use emphral port in case tests are running in parallel
+    config.setInt("nfs3.mountd.port", 0);
+    config.setInt("nfs3.server.port", 0);
+
+    try {
+      cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+      cluster.waitActive();
+
+      Nfs3 nfs3 = new Nfs3(config);
+      nfs3.startServiceInternal(false);
+      nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
+
+      DFSClient dfsClient = new DFSClient(NameNode.getAddress(config), config);
+      HdfsFileStatus status = dfsClient.getFileInfo("/");
+      FileHandle rootHandle = new FileHandle(status.getFileId());
+
+      CREATE3Request createReq = new CREATE3Request(rootHandle,
+          "out-of-order-write" + System.currentTimeMillis(),
+          Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
+      XDR createXdr = new XDR();
+      createReq.serialize(createXdr);
+      CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
+          securityHandler, new InetSocketAddress("localhost", 1234));
+      FileHandle handle = createRsp.getObjHandle();
+
+      byte[][] oooBuf = new byte[numOOO][bufSize];
+      for (int i = 0; i < numOOO; i++) {
+        Arrays.fill(oooBuf[i], (byte) i);
+      }
+
+      for (int i = 0; i < numOOO; i++) {
+        final long offset = (numOOO - 1 - i) * bufSize;
+        WRITE3Request writeReq = new WRITE3Request(handle, offset, bufSize,
+            WriteStableHow.UNSTABLE, ByteBuffer.wrap(oooBuf[i]));
+        XDR writeXdr = new XDR();
+        writeReq.serialize(writeXdr);
+        nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
+            new InetSocketAddress("localhost", 1234));
+      }
+
+      waitWrite(nfsd, handle, 60000);
+      READ3Request readReq = new READ3Request(handle, bufSize, bufSize);
+      XDR readXdr = new XDR();
+      readReq.serialize(readXdr);
+      READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
+          securityHandler, new InetSocketAddress("localhost", config.getInt(
+              NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
+              NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT)));
+      assertTrue(Arrays.equals(oooBuf[1], readRsp.getData().array()));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 20 01:34:29 2014
@@ -130,10 +130,13 @@ Trunk (Unreleased)
     HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable
     directory. (Jing Zhao via wheat9)
 
+    HDFS-6482. Use block ID-based block layout on datanodes (James Thomas via
+    Colin Patrick McCabe)
+
   OPTIMIZATIONS
 
   BUG FIXES
-
+ 
     HADOOP-9635 Fix potential Stack Overflow in DomainSocket.c (V. Karthik Kumar
                 via cmccabe)
 
@@ -184,9 +187,6 @@ Trunk (Unreleased)
 
     HDFS-3549. Fix dist tar build fails in hadoop-hdfs-raid project. (Jason Lowe via daryn)
 
-    HDFS-3482. hdfs balancer throws ArrayIndexOutOfBoundsException 
-    if option is specified without values. ( Madhukara Phatak via umamahesh) 
-
     HDFS-3614. Revert unused MiniDFSCluster constructor from HDFS-3049.
     (acmurthy via eli)
 
@@ -317,12 +317,106 @@ Release 2.6.0 - UNRELEASED
     HDFS-6701. Make seed optional in NetworkTopology#sortByDistance.
     (Ashwin Shankar via wang)
 
+    HDFS-6755. There is an unnecessary sleep in the code path where
+    DFSOutputStream#close gives up its attempt to contact the namenode
+    (mitdesai21 via cmccabe)
+
+    HDFS-6750. The DataNode should use its shared memory segment to mark
+    short-circuit replicas that have been unlinked as stale (cmccabe)
+
+    HDFS-6739. Add getDatanodeStorageReport to ClientProtocol. (szetszwo)
+
+    HDFS-6665. Add tests for XAttrs in combination with viewfs.
+    (Stephen Chu via wang)
+
+    HDFS-6778. The extended attributes javadoc should simply refer to the
+    user docs. (clamb via wang)
+
+    HDFS-6570. add api that enables checking if a user has certain permissions on
+    a file. (Jitendra Pandey via cnauroth)
+
+    HDFS-6441. Add ability to exclude/include specific datanodes while
+    balancing. (Benoy Antony and Yu Li via Arpit Agarwal)
+
+    HDFS-6685. Balancer should preserve storage type of replicas.  (szetszwo)
+
+    HDFS-6798. Add test case for incorrect data node condition during
+    balancing. (Benoy Antony via Arpit Agarwal)
+
+    HDFS-6796. Improve the argument check during balancer command line parsing.
+    (Benoy Antony via szetszwo)
+
+    HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo
+    where possible (Arpit Agarwal)
+
+    HDFS-6802. Some tests in TestDFSClientFailover are missing @Test
+    annotation. (Akira Ajisaka via wang)
+
+    HDFS-6788. Improve synchronization in BPOfferService with read write lock.
+    (Yongjun Zhang via wang)
+
+    HDFS-6787. Remove duplicate code in FSDirectory#unprotectedConcat. (Yi Liu via umamahesh)
+
+    HDFS-6809. Move Balancer's inner classes MovedBlocks and Matcher as to
+    standalone classes and separates KeyManager from NameNodeConnector.
+    (szetszwo)
+
+    HDFS-6812. Remove addBlock and replaceBlock from DatanodeDescriptor.
+    (szetszwo)
+
+    HDFS-6781. Separate HDFS commands from CommandsManual.apt.vm. (Akira
+    Ajisaka via Arpit Agarwal)
+
+    HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
+    necessary. (Lei Xu via atm)
+
+    HDFS-6740. Make FSDataset support adding data volumes dynamically. (Lei
+    Xu via atm)
+
+    HDFS-6722. Display readable last contact time for dead nodes on NN webUI.
+    (Ming Ma via wheat9)
+
+    HDFS-6772. Get DN storages out of blockContentsStale state faster after
+    NN restarts. (Ming Ma via Arpit Agarwal)
+
+    HDFS-573. Porting libhdfs to Windows. (cnauroth)
+
+    HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via
+    jing9)
+
+    HDFS-6837. Code cleanup for Balancer and Dispatcher. (szetszwo via
+    jing9)
+
+    HDFS-6838. Code cleanup for unnecessary INode replacement.
+    (Jing Zhao via wheat9)
+
+    HDFS-6836. HDFS INFO logging is verbose & uses file appenders. (Xiaoyu
+    Yao via Arpit Agarwal)
+
+    HDFS-6567. Normalize the order of public final in HdfsFileStatus.
+    (Tassapol Athiapinya via wheat9)
+
+    HDFS-6849. Replace HttpFS custom proxyuser handling with common 
+    implementation. (tucu)
+
+    HDFS-6850. Move NFS out of order write unit tests into TestWrites class.
+    (Zhe Zhang via atm)
+
+    HDFS-6188. An ip whitelist based implementation of TrustedChannelResolver.
+    (Benoy Antony via Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)
 
   BUG FIXES
 
+    HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for 
+    insecure HDFS (Allen Wittenauer via raviprak)
+
+    HDFS-6517. Remove hadoop-metrics2.properties from hdfs project (Akira 
+    AJISAKA via aw)
+
     HDFS-6617. Flake TestDFSZKFailoverController.testManualFailoverWithDFSHAAdmin
     due to a long edit log sync op. (Liang Xie via cnauroth)
 
@@ -376,6 +470,59 @@ Release 2.6.0 - UNRELEASED
     HDFS-6752. Avoid Address bind errors in TestDatanodeConfig#testMemlockLimit
     (vinayakumarb)
 
+    HDFS-6749. FSNamesystem methods should call resolvePath.
+    (Charles Lamb via cnauroth)
+
+    HDFS-4629. Using com.sun.org.apache.xml.internal.serialize.* in
+    XmlEditsVisitor.java is JVM vendor specific. Breaks IBM JAVA.
+    (Amir Sanjar via stevel)
+
+    HDFS-3482. hdfs balancer throws ArrayIndexOutOfBoundsException 
+    if option is specified without values. ( Madhukara Phatak via umamahesh) 
+
+    HDFS-6797. DataNode logs wrong layoutversion during upgrade. (Benoy Antony
+    via Arpit Agarwal)
+
+    HDFS-6810. StorageReport array is initialized with wrong size in
+    DatanodeDescriptor#getStorageReports. (szetszwo via Arpit Agarwal)
+
+    HDFS-5723. Append failed FINALIZED replica should not be accepted as valid
+    when that block is underconstruction (vinayakumarb)
+
+    HDFS-5185. DN fails to startup if one of the data dir is full. (vinayakumarb)
+
+    HDFS-6451. NFS should not return NFS3ERR_IO for AccessControlException 
+    (Abhiraj Butala via brandonli)
+
+    HDFS-6717. JIRA HDFS-5804 breaks default nfs-gateway behavior for unsecured config
+    (brandonli)
+
+    HDFS-6790. DFSUtil Should Use configuration.getPassword for SSL passwords
+    (Larry McCay via brandonli)
+
+    HDFS-6791. A block could remain under replicated if all of its replicas are on
+    decommissioned nodes. (Ming Ma via jing9)
+
+    HDFS-6582. Missing null check in RpcProgramNfs3#read(XDR, SecurityHandler)
+    (Abhiraj Butala via brandonli)
+
+    HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
+    block replica (Arpit Agarwal)
+
+    HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate
+    responses to Balancer (vinayakumarb)
+
+    HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. (Yi Liu and Colin Patrick McCabe via umamahesh)
+
+    HDFS-6825. Edit log corruption due to delayed block removal.
+    (Yongjun Zhang via wang)
+
+    HDFS-6569. OOB message can't be sent to the client when DataNode shuts down for upgrade
+    (brandonli)
+
+    HDFS-6868. portmap and nfs3 are documented as hadoop commands instead of hdfs
+    (brandonli)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -928,6 +1075,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6723. New NN webUI no longer displays decommissioned state for dead node.
     (Ming Ma via wheat9)
 
+    HDFS-6768. Fix a few unit tests that use hard-coded port numbers. (Arpit
+    Agarwal)
+
   BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
 
     HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)



Mime
View raw message