hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinayakum...@apache.org
Subject svn commit: r1604086 [1/2] - in /hadoop/common/branches/HDFS-5442/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/main/java/org/apac...
Date Fri, 20 Jun 2014 05:04:00 GMT
Author: vinayakumarb
Date: Fri Jun 20 05:03:56 2014
New Revision: 1604086

URL: http://svn.apache.org/r1604086
Log:
Merged latest changes from trunk

Added:
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java
      - copied unchanged from r1604085, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java
Removed:
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java
Modified:
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/xattr.proto
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/Federation.apt.vm
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsNfsGateway.apt.vm
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsUserGuide.apt.vm
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/site/xdoc/HdfsRollingUpgrade.xml
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestXAttrWithSnapshot.java
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
    hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml

Propchange: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1603355-1604085

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java Fri Jun 20 05:03:56 2014
@@ -51,7 +51,10 @@ public class NfsConfigKeys {
   public static final String DFS_NFS_KEYTAB_FILE_KEY = "nfs.keytab.file";
   public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "nfs.kerberos.principal";
   public static final String DFS_NFS_REGISTRATION_PORT_KEY = "nfs.registration.port";
-  public static final int    DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
-  public static final String  DFS_NFS_ALLOW_INSECURE_PORTS_KEY = "nfs.allow.insecure.ports";
-  public static final boolean DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT = true;
-}
\ No newline at end of file
+  public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
+  public static final String DFS_NFS_PORT_MONITORING_DISABLED_KEY = "nfs.port.monitoring.disabled";
+  public static final boolean DFS_NFS_PORT_MONITORING_DISABLED_DEFAULT = true;
+
+  public static final String  AIX_COMPAT_MODE_KEY = "nfs.aix.compatibility.mode.enabled";
+  public static final boolean AIX_COMPAT_MODE_DEFAULT = false;
+}

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java Fri Jun 20 05:03:56 2014
@@ -36,6 +36,8 @@ public class NfsConfiguration extends Hd
             NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY),
         new DeprecationDelta("nfs3.mountd.port",
             NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY),
+        new DeprecationDelta("dfs.nfs.exports.cache.size",
+            Nfs3Constant.NFS_EXPORTS_CACHE_SIZE_KEY),
         new DeprecationDelta("dfs.nfs.exports.cache.expirytime.millis",
             Nfs3Constant.NFS_EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY),
         new DeprecationDelta("hadoop.nfs.userupdate.milly",
@@ -49,6 +51,18 @@ public class NfsConfiguration extends Hd
         new DeprecationDelta("dfs.nfs3.stream.timeout",
             NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY),
         new DeprecationDelta("dfs.nfs3.export.point",
-            NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY) });
+            NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY),
+        new DeprecationDelta("nfs.allow.insecure.ports",
+            NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY),
+        new DeprecationDelta("dfs.nfs.keytab.file",
+            NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY),
+        new DeprecationDelta("dfs.nfs.kerberos.principal",
+            NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY),
+        new DeprecationDelta("dfs.nfs.rtmax",
+            NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY),
+        new DeprecationDelta("dfs.nfs.wtmax",
+            NfsConfigKeys.DFS_NFS_MAX_WRITE_TRANSFER_SIZE_KEY),
+        new DeprecationDelta("dfs.nfs.dtmax",
+            NfsConfigKeys.DFS_NFS_MAX_READDIR_TRANSFER_SIZE_KEY) });
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Fri Jun 20 05:03:56 2014
@@ -194,7 +194,13 @@ public class RpcProgramMountd extends Rp
     if (mntproc == MNTPROC.NULL) {
       out = nullOp(out, xid, client);
     } else if (mntproc == MNTPROC.MNT) {
-      out = mnt(xdr, out, xid, client);
+      // Only do port monitoring for MNT
+      if (!doPortMonitoring(info.remoteAddress())) {
+        out = MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_ACCES, out,
+            xid, null);
+      } else {
+        out = mnt(xdr, out, xid, client);
+      }
     } else if (mntproc == MNTPROC.DUMP) {
       out = dump(out, xid, client);
     } else if (mntproc == MNTPROC.UMNT) {      

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Fri Jun 20 05:03:56 2014
@@ -61,8 +61,8 @@ public class Nfs3 extends Nfs3Base {
     StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
     NfsConfiguration conf = new NfsConfiguration();
     boolean allowInsecurePorts = conf.getBoolean(
-        NfsConfigKeys.DFS_NFS_ALLOW_INSECURE_PORTS_KEY,
-        NfsConfigKeys.DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT);
+        NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY,
+        NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_DEFAULT);
     final Nfs3 nfsServer = new Nfs3(conf, registrationSocket,
         allowInsecurePorts);
     nfsServer.startServiceInternal(true);

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Fri Jun 20 05:03:56 2014
@@ -95,6 +95,7 @@ class OpenFileCtx {
    */
   private AtomicLong nextOffset;
   private final HdfsDataOutputStream fos;
+  private final boolean aixCompatMode;
   
   // It's updated after each sync to HDFS
   private Nfs3FileAttributes latestAttr;
@@ -199,8 +200,15 @@ class OpenFileCtx {
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
       String dumpFilePath, DFSClient client, IdUserGroup iug) {
+    this(fos, latestAttr, dumpFilePath, client, iug, false);
+  }
+  
+  OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
+      String dumpFilePath, DFSClient client, IdUserGroup iug,
+      boolean aixCompatMode) {
     this.fos = fos;
     this.latestAttr = latestAttr;
+    this.aixCompatMode = aixCompatMode;
     // We use the ReverseComparatorOnMin as the comparator of the map. In this
     // way, we first dump the data with larger offset. In the meanwhile, we
     // retrieve the last element to write back to HDFS.
@@ -780,15 +788,29 @@ class OpenFileCtx {
     }
 
     if (commitOffset > 0) {
-      if (commitOffset > flushed) {
-        if (!fromRead) {
-          CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
-              preOpAttr);
-          pendingCommits.put(commitOffset, commitCtx);
+      if (aixCompatMode) {
+        // The AIX NFS client misinterprets RFC-1813 and will always send 4096
+        // for the commitOffset even if fewer bytes than that have ever (or will
+        // ever) be sent by the client. So, if in AIX compatibility mode, we
+        // will always DO_SYNC if the number of bytes to commit have already all
+        // been flushed, else we will fall through to the logic below which
+        // checks for pending writes in the case that we're being asked to
+        // commit more bytes than have so far been flushed. See HDFS-6549 for
+        // more info.
+        if (commitOffset <= flushed) {
+          return COMMIT_STATUS.COMMIT_DO_SYNC;
         }
-        return COMMIT_STATUS.COMMIT_WAIT;
       } else {
-        return COMMIT_STATUS.COMMIT_DO_SYNC;
+        if (commitOffset > flushed) {
+          if (!fromRead) {
+            CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+                preOpAttr);
+            pendingCommits.put(commitOffset, commitCtx);
+          }
+          return COMMIT_STATUS.COMMIT_WAIT;
+        } else {
+          return COMMIT_STATUS.COMMIT_DO_SYNC;
+        } 
       }
     }
 

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Fri Jun 20 05:03:56 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
@@ -152,6 +153,7 @@ public class RpcProgramNfs3 extends RpcP
   private final short replication;
   private final long blockSize;
   private final int bufferSize;
+  private final boolean aixCompatMode;
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
   
@@ -169,8 +171,11 @@ public class RpcProgramNfs3 extends RpcP
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup(config);
     
+    aixCompatMode = config.getBoolean(
+        NfsConfigKeys.AIX_COMPAT_MODE_KEY,
+        NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
     exports = NfsExports.getInstance(config);
-    writeManager = new WriteManager(iug, config);
+    writeManager = new WriteManager(iug, config, aixCompatMode);
     clientCache = new DFSClientCache(config);
     replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
         DFSConfigKeys.DFS_REPLICATION_DEFAULT);
@@ -230,15 +235,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr, RpcInfo info) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -322,9 +327,9 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr, RpcInfo info) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -370,7 +375,7 @@ public class RpcProgramNfs3 extends RpcP
       }
       
       // check the write access privilege
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             preOpWcc, preOpAttr));
       }
@@ -398,15 +403,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr, RpcInfo info) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -460,15 +465,15 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public ACCESS3Response access(XDR xdr, RpcInfo info) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -519,15 +524,16 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  @Override
+  public READLINK3Response readlink(XDR xdr, RpcInfo info) {
     READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
 
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -591,12 +597,19 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public READ3Response read(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public READ3Response read(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return read(xdr, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  READ3Response read(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
     final String userName = securityHandler.getUser();
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
@@ -715,8 +728,17 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public WRITE3Response write(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client) {
+  public WRITE3Response write(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    RpcCall rpcCall = (RpcCall) info.header();
+    int xid = rpcCall.getXid();
+    SocketAddress remoteAddress = info.remoteAddress();
+    return write(xdr, info.channel(), xid, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  WRITE3Response write(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, SocketAddress remoteAddress) {
     WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
 
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
@@ -758,7 +780,7 @@ public class RpcProgramNfs3 extends RpcP
         return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
       }
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
             Nfs3Constant.WRITE_COMMIT_VERF);
@@ -791,8 +813,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public CREATE3Response create(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return create(xdr, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
@@ -838,7 +867,7 @@ public class RpcProgramNfs3 extends RpcP
         return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
       }
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
             preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
                 preOpDirAttr));
@@ -875,7 +904,8 @@ public class RpcProgramNfs3 extends RpcP
       
       // Add open stream
       OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
-          writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
+          writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
+          aixCompatMode);
       fileHandle = new FileHandle(postOpObjAttr.getFileId());
       if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
         LOG.warn("Can't add more stream, close it."
@@ -922,9 +952,9 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public MKDIR3Response mkdir(XDR xdr, RpcInfo info) {
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -960,7 +990,7 @@ public class RpcProgramNfs3 extends RpcP
         return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
 
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
             new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
       }
@@ -1012,15 +1042,15 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READDIR3Response mknod(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  @Override
+  public READDIR3Response mknod(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
   
   @Override
-  public REMOVE3Response remove(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public REMOVE3Response remove(XDR xdr, RpcInfo info) {
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1093,9 +1123,9 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public RMDIR3Response rmdir(XDR xdr, RpcInfo info) {
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1129,7 +1159,7 @@ public class RpcProgramNfs3 extends RpcP
       
       WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           preOpDirAttr);
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); 
       }
 
@@ -1175,9 +1205,9 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public RENAME3Response rename(XDR xdr, RpcInfo info) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1221,7 +1251,7 @@ public class RpcProgramNfs3 extends RpcP
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
       }
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
             fromPreOpAttr);
         WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
@@ -1263,15 +1293,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public SYMLINK3Response symlink(XDR xdr, RpcInfo info) {
     SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
 
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1322,8 +1352,8 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  @Override
+  public READDIR3Response link(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
@@ -1351,11 +1381,16 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
+  public READDIR3Response readdir(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return readdir(xdr, securityHandler, remoteAddress);
+  }
   public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+      SocketAddress remoteAddress) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
@@ -1408,9 +1443,24 @@ public class RpcProgramNfs3 extends RpcP
       }
       long cookieVerf = request.getCookieVerf();
       if ((cookieVerf != 0) && (cookieVerf != dirStatus.getModificationTime())) {
-        LOG.error("CookierVerf mismatch. request cookierVerf:" + cookieVerf
-            + " dir cookieVerf:" + dirStatus.getModificationTime());
-        return new READDIR3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        if (aixCompatMode) {
+          // The AIX NFS client misinterprets RFC-1813 and will repeatedly send
+          // the same cookieverf value even across VFS-level readdir calls,
+          // instead of getting a new cookieverf for every VFS-level readdir
+          // call, and reusing the cookieverf only in the event that multiple
+          // incremental NFS-level readdir calls must be made to fetch all of
+          // the directory entries. This means that whenever a readdir call is
+          // made by an AIX NFS client for a given directory, and that directory
+          // is subsequently modified, thus changing its mtime, no later readdir
+          // calls will succeed from AIX for that directory until the FS is
+          // unmounted/remounted. See HDFS-6549 for more info.
+          LOG.warn("AIX compatibility mode enabled, ignoring cookieverf " +
+              "mismatches.");
+        } else {
+          LOG.error("CookieVerf mismatch. request cookieVerf: " + cookieVerf
+              + " dir cookieVerf: " + dirStatus.getModificationTime());
+          return new READDIR3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        }
       }
 
       if (cookie == 0) {
@@ -1491,9 +1541,17 @@ public class RpcProgramNfs3 extends RpcP
         dirStatus.getModificationTime(), dirList);
   }
 
-  public READDIRPLUS3Response readdirplus(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+  @Override
+  public READDIRPLUS3Response readdirplus(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return readdirplus(xdr, securityHandler, remoteAddress);
+  }
+
+  @VisibleForTesting
+  READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
     }
     
@@ -1550,9 +1608,22 @@ public class RpcProgramNfs3 extends RpcP
       }
       long cookieVerf = request.getCookieVerf();
       if ((cookieVerf != 0) && (cookieVerf != dirStatus.getModificationTime())) {
-        LOG.error("CookierVerf mismatch. request cookierVerf:" + cookieVerf
-            + " dir cookieVerf:" + dirStatus.getModificationTime());
-        return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        if (aixCompatMode) {
+          // The AIX NFS client misinterprets RFC-1813 and will repeatedly send
+          // the same cookieverf value even across VFS-level readdir calls,
+          // instead of getting a new cookieverf for every VFS-level readdir
+          // call. This means that whenever a readdir call is made by an AIX NFS
+          // client for a given directory, and that directory is subsequently
+          // modified, thus changing its mtime, no later readdir calls will
+          // succeed for that directory from AIX until the FS is
+          // unmounted/remounted. See HDFS-6549 for more info.
+          LOG.warn("AIX compatibility mode enabled, ignoring cookieverf " +
+              "mismatches.");
+        } else {
+          LOG.error("cookieverf mismatch. request cookieverf: " + cookieVerf
+              + " dir cookieverf: " + dirStatus.getModificationTime());
+          return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        }
       }
 
       if (cookie == 0) {
@@ -1643,15 +1714,15 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr, RpcInfo info) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1711,15 +1782,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr, RpcInfo info) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1769,15 +1840,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr, RpcInfo info) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1816,9 +1887,11 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client) {
+  public COMMIT3Response commit(XDR xdr, RpcInfo info) {
+    //Channel channel, int xid,
+    //    SecurityHandler securityHandler, InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1849,7 +1922,7 @@ public class RpcProgramNfs3 extends RpcP
         return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
       }
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
             Nfs3Constant.WRITE_COMMIT_VERF);
@@ -1859,8 +1932,10 @@ public class RpcProgramNfs3 extends RpcP
           : (request.getOffset() + request.getCount());
       
       // Insert commit as an async request
-      writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
-          preOpAttr);
+      RpcCall rpcCall = (RpcCall) info.header();
+      int xid = rpcCall.getXid();
+      writeManager.handleCommit(dfsClient, handle, commitOffset,
+          info.channel(), xid, preOpAttr);
       return null;
     } catch (IOException e) {
       LOG.warn("Exception ", e);
@@ -1885,11 +1960,16 @@ public class RpcProgramNfs3 extends RpcP
       return null;
     }
   }
+
+  private SecurityHandler getSecurityHandler(RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
+    return getSecurityHandler(rpcCall.getCredential(), rpcCall.getVerifier());
+  }
   
   @Override
   public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
     RpcCall rpcCall = (RpcCall) info.header();
-    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
+    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());    
     int xid = rpcCall.getXid();
     byte[] data = new byte[info.data().readableBytes()];
     info.data().readBytes(data);
@@ -1897,9 +1977,8 @@ public class RpcProgramNfs3 extends RpcP
     XDR out = new XDR();
     InetAddress client = ((InetSocketAddress) info.remoteAddress())
         .getAddress();
-    Channel channel = info.channel();
-
     Credentials credentials = rpcCall.getCredential();
+    
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
       if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
@@ -1937,27 +2016,24 @@ public class RpcProgramNfs3 extends RpcP
       }
     }
     
-    SecurityHandler securityHandler = getSecurityHandler(credentials,
-        rpcCall.getVerifier());
-    
     NFS3Response response = null;
     if (nfsproc3 == NFSPROC3.NULL) {
       response = nullProcedure();
     } else if (nfsproc3 == NFSPROC3.GETATTR) {
-      response = getattr(xdr, securityHandler, client);
+      response = getattr(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SETATTR) {
-      response = setattr(xdr, securityHandler, client);
+      response = setattr(xdr, info);
     } else if (nfsproc3 == NFSPROC3.LOOKUP) {
-      response = lookup(xdr, securityHandler, client);
+      response = lookup(xdr, info);
     } else if (nfsproc3 == NFSPROC3.ACCESS) {
-      response = access(xdr, securityHandler, client);
+      response = access(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READLINK) {
-      response = readlink(xdr, securityHandler, client);
+      response = readlink(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READ) {
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.READ_RPC_START + xid);
       }    
-      response = read(xdr, securityHandler, client);
+      response = read(xdr, info);
       if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
         LOG.debug(Nfs3Utils.READ_RPC_END + xid);
       }
@@ -1965,36 +2041,36 @@ public class RpcProgramNfs3 extends RpcP
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
       }
-      response = write(xdr, channel, xid, securityHandler, client);
+      response = write(xdr, info);
       // Write end debug trace is in Nfs3Utils.writeChannel
     } else if (nfsproc3 == NFSPROC3.CREATE) {
-      response = create(xdr, securityHandler, client);
+      response = create(xdr, info);
     } else if (nfsproc3 == NFSPROC3.MKDIR) {      
-      response = mkdir(xdr, securityHandler, client);
+      response = mkdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
-      response = symlink(xdr, securityHandler, client);
+      response = symlink(xdr, info);
     } else if (nfsproc3 == NFSPROC3.MKNOD) {
-      response = mknod(xdr, securityHandler, client);
+      response = mknod(xdr, info);
     } else if (nfsproc3 == NFSPROC3.REMOVE) {
-      response = remove(xdr, securityHandler, client);
+      response = remove(xdr, info);
     } else if (nfsproc3 == NFSPROC3.RMDIR) {
-      response = rmdir(xdr, securityHandler, client);
+      response = rmdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.RENAME) {
-      response = rename(xdr, securityHandler, client);
+      response = rename(xdr, info);
     } else if (nfsproc3 == NFSPROC3.LINK) {
-      response = link(xdr, securityHandler, client);
+      response = link(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READDIR) {
-      response = readdir(xdr, securityHandler, client);
+      response = readdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
-      response = readdirplus(xdr, securityHandler, client);
+      response = readdirplus(xdr, info);
     } else if (nfsproc3 == NFSPROC3.FSSTAT) {
-      response = fsstat(xdr, securityHandler, client);
+      response = fsstat(xdr, info);
     } else if (nfsproc3 == NFSPROC3.FSINFO) {
-      response = fsinfo(xdr, securityHandler, client);
+      response = fsinfo(xdr, info);
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
-      response = pathconf(xdr, securityHandler, client);
+      response = pathconf(xdr,info);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, channel, xid, securityHandler, client);
+      response = commit(xdr, info);
     } else {
       // Invalid procedure
       RpcAcceptedReply.getInstance(xid,
@@ -2027,8 +2103,21 @@ public class RpcProgramNfs3 extends RpcP
     return nfsproc3 == null || nfsproc3.isIdempotent();
   }
   
-  private boolean checkAccessPrivilege(final InetAddress client,
+  private boolean checkAccessPrivilege(RpcInfo info,
+      final AccessPrivilege expected) {
+    SocketAddress remoteAddress = info.remoteAddress();
+    return checkAccessPrivilege(remoteAddress, expected);
+  }
+
+  private boolean checkAccessPrivilege(SocketAddress remoteAddress,
       final AccessPrivilege expected) {
+    // Port monitoring
+    if (!doPortMonitoring(remoteAddress)) {
+      return false;
+    }
+    
+    // Check export table
+    InetAddress client = ((InetSocketAddress) remoteAddress).getAddress();
     AccessPrivilege access = exports.getAccessPrivilege(client);
     if (access == AccessPrivilege.NONE) {
       return false;

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Fri Jun 20 05:03:56 2014
@@ -58,6 +58,7 @@ public class WriteManager {
   private boolean asyncDataServiceStarted = false;
 
   private final int maxStreams;
+  private final boolean aixCompatMode;
 
   /**
    * The time limit to wait for accumulate reordered sequential writes to the
@@ -79,9 +80,11 @@ public class WriteManager {
     return fileContextCache.put(h, ctx);
   }
   
-  WriteManager(IdUserGroup iug, final NfsConfiguration config) {
+  WriteManager(IdUserGroup iug, final NfsConfiguration config,
+      boolean aixCompatMode) {
     this.iug = iug;
     this.config = config;
+    this.aixCompatMode = aixCompatMode;
     streamTimeout = config.getLong(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY,
         NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
     LOG.info("Stream timeout is " + streamTimeout + "ms.");
@@ -175,7 +178,7 @@ public class WriteManager {
       String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
           NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
       openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
-          + fileHandle.getFileId(), dfsClient, iug);
+          + fileHandle.getFileId(), dfsClient, iug, aixCompatMode);
 
       if (!addOpenFileStream(fileHandle, openFileCtx)) {
         LOG.info("Can't add new stream. Close it. Tell client to retry.");

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Fri Jun 20 05:03:56 2014
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -190,6 +190,29 @@ public class TestWrites {
     ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
   }
+  
+  @Test
+  public void testCheckCommitAixCompatMode() throws IOException {
+    DFSClient dfsClient = Mockito.mock(DFSClient.class);
+    Nfs3FileAttributes attr = new Nfs3FileAttributes();
+    HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+
+    // Last argument "true" here to enable AIX compatibility mode.
+    OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
+        new IdUserGroup(new NfsConfiguration()), true);
+    
+    // Test fall-through to pendingWrites check in the event that commitOffset
+    // is greater than the number of bytes we've so far flushed.
+    Mockito.when(fos.getPos()).thenReturn((long) 2);
+    COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
+    Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED);
+    
+    // Test the case when we actually have received more bytes than we're trying
+    // to commit.
+    Mockito.when(fos.getPos()).thenReturn((long) 10);
+    status = ctx.checkCommitInternal(5, null, 1, attr, false);
+    Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
+  }
 
   @Test
   // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
@@ -207,7 +230,7 @@ public class TestWrites {
 
     FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
     COMMIT_STATUS ret;
-    WriteManager wm = new WriteManager(new IdUserGroup(config), config);
+    WriteManager wm = new WriteManager(new IdUserGroup(config), config, false);
     assertTrue(wm.addOpenFileStream(h, ctx));
     
     // Test inactive open file context
@@ -318,7 +341,7 @@ public class TestWrites {
       XDR createXdr = new XDR();
       createReq.serialize(createXdr);
       CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
       FileHandle handle = createRsp.getObjHandle();
 
       // Test DATA_SYNC
@@ -331,7 +354,7 @@ public class TestWrites {
       XDR writeXdr = new XDR();
       writeReq.serialize(writeXdr);
       nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
-          InetAddress.getLocalHost());
+          new InetSocketAddress("localhost", 1234));
 
       waitWrite(nfsd, handle, 60000);
 
@@ -340,7 +363,7 @@ public class TestWrites {
       XDR readXdr = new XDR();
       readReq.serialize(readXdr);
       READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
 
       assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
 
@@ -352,7 +375,7 @@ public class TestWrites {
       XDR createXdr2 = new XDR();
       createReq2.serialize(createXdr2);
       CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
       FileHandle handle2 = createRsp2.getObjHandle();
 
       WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
@@ -360,7 +383,7 @@ public class TestWrites {
       XDR writeXdr2 = new XDR();
       writeReq2.serialize(writeXdr2);
       nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
-          InetAddress.getLocalHost());
+          new InetSocketAddress("localhost", 1234));
 
       waitWrite(nfsd, handle2, 60000);
 
@@ -369,7 +392,7 @@ public class TestWrites {
       XDR readXdr2 = new XDR();
       readReq2.serialize(readXdr2);
       READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
 
       assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
       // FILE_SYNC should sync the file size

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jun 20 05:03:56 2014
@@ -254,9 +254,6 @@ Trunk (Unreleased)
     HDFS-5794. Fix the inconsistency of layout version number of 
     ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9)
 
-    HDFS-6375. Listing extended attributes with the search permission.
-    (Charles Lamb via wang)
-
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -452,6 +449,13 @@ Release 2.5.0 - UNRELEASED
     HDFS-6545. Finalizing rolling upgrade can make NN unavailable for a long
     duration. (kihwal)
 
+    HDFS-6530. Fix Balancer documentation.  (szetszwo)
+
+    HDFS-6480. Move waitForReady() from FSDirectory to FSNamesystem. (wheat9)
+
+    HDFS-6403. Add metrics for log warnings reported by JVM pauses. (Yongjun
+    Zhang via atm)
+
   OPTIMIZATIONS
 
     HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
@@ -651,6 +655,30 @@ Release 2.5.0 - UNRELEASED
     HDFS-6527. Edit log corruption due to defered INode removal. (kihwal and
     jing9 via jing9)
 
+    HDFS-6552. add DN storage to a BlockInfo will not replace the different
+    storage from same DN. (Amir Langer via Arpit Agarwal)
+
+    HDFS-6551. Rename with OVERWRITE option may throw NPE when the target
+    file/directory is a reference INode. (jing9)
+
+    HDFS-6439. NFS should not reject NFS requests to the NULL procedure whether
+    port monitoring is enabled or not. (brandonli)
+
+    HDFS-6559. Fix wrong option "dfsadmin -rollingUpgrade start" in the
+    document. (Akira Ajisaka via Arpit Agarwal)
+
+    HDFS-6553. Add missing DeprecationDeltas for NFS Kerberos configurations
+    (Stephen Chu via brandonli)
+
+    HDFS-6563. NameNode cannot save fsimage in certain circumstances when
+    snapshots are in use. (atm)
+
+    HDFS-3848. A Bug in recoverLeaseInternal method of FSNameSystem class
+    (Hooman Peiro Sajjad  and Chen He via kihwal)
+
+    HDFS-6549. Add support for accessing the NFS gateway from the AIX NFS
+    client. (atm)
+
   BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
 
     HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
@@ -714,6 +742,15 @@ Release 2.5.0 - UNRELEASED
     HDFS-6374. setXAttr should require the user to be the owner of the file
     or directory (Charles Lamb via wang)
 
+    HDFS-6375. Listing extended attributes with the search permission.
+    (Charles Lamb via wang)
+
+    HDFS-6492. Support create-time xattrs and atomically setting multiple
+    xattrs. (wang)
+
+    HDFS-6312. WebHdfs HA failover is broken on secure clusters. 
+    (daryn via tucu)
+
 Release 2.4.1 - 2014-06-23 
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1603355-1604085

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java Fri Jun 20 05:03:56 2014
@@ -19,6 +19,8 @@ package org.apache.hadoop.fs;
 
 import java.util.Arrays;
 
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
@@ -105,42 +107,47 @@ public class XAttr {
   
   @Override
   public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((name == null) ? 0 : name.hashCode());
-    result = prime * result + ((ns == null) ? 0 : ns.hashCode());
-    result = prime * result + Arrays.hashCode(value);
-    return result;
+    return new HashCodeBuilder(811, 67)
+        .append(name)
+        .append(ns)
+        .append(value)
+        .toHashCode();
   }
 
   @Override
   public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
+    if (obj == null) { return false; }
+    if (obj == this) { return true; }
+    if (obj.getClass() != getClass()) {
       return false;
     }
-    XAttr other = (XAttr) obj;
-    if (name == null) {
-      if (other.name != null) {
-        return false;
-      }
-    } else if (!name.equals(other.name)) {
-      return false;
-    }
-    if (ns != other.ns) {
-      return false;
-    }
-    if (!Arrays.equals(value, other.value)) {
+    XAttr rhs = (XAttr) obj;
+    return new EqualsBuilder()
+        .append(ns, rhs.ns)
+        .append(name, rhs.name)
+        .append(value, rhs.value)
+        .isEquals();
+  }
+
+  /**
+   * Similar to {@link #equals(Object)}, except ignores the XAttr value.
+   *
+   * @param obj to compare equality
+   * @return if the XAttrs are equal, ignoring the XAttr value
+   */
+  public boolean equalsIgnoreValue(Object obj) {
+    if (obj == null) { return false; }
+    if (obj == this) { return true; }
+    if (obj.getClass() != getClass()) {
       return false;
     }
-    return true;
+    XAttr rhs = (XAttr) obj;
+    return new EqualsBuilder()
+        .append(ns, rhs.ns)
+        .append(name, rhs.name)
+        .isEquals();
   }
-  
+
   @Override
   public String toString() {
     return "XAttr [ns=" + ns + ", name=" + name + ", value="

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Jun 20 05:03:56 2014
@@ -2093,6 +2093,9 @@ public class PBHelper {
   
   public static List<XAttrProto> convertXAttrProto(
       List<XAttr> xAttrSpec) {
+    if (xAttrSpec == null) {
+      return Lists.newArrayListWithCapacity(0);
+    }
     ArrayList<XAttrProto> xAttrs = Lists.newArrayListWithCapacity(
         xAttrSpec.size());
     for (XAttr a : xAttrSpec) {

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Fri Jun 20 05:03:56 2014
@@ -203,7 +203,7 @@ public class BlockInfo extends Block imp
       } else {
         // The block is on the DN but belongs to a different storage.
         // Update our state.
-        removeStorage(storage);
+        removeStorage(getStorageInfo(idx));
         added = false;      // Just updating storage. Return false.
       }
     }

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jun 20 05:03:56 2014
@@ -778,7 +778,8 @@ public class DataNode extends Configured
     initIpcServer(conf);
 
     metrics = DataNodeMetrics.create(conf, getDisplayName());
-
+    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
+    
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(conf);
 

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Fri Jun 20 05:03:56 2014
@@ -90,13 +90,15 @@ public class DataNodeMetrics {
   final MutableQuantiles[] sendDataPacketBlockedOnNetworkNanosQuantiles;
   @Metric MutableRate sendDataPacketTransferNanos;
   final MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
-  
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
-
-  public DataNodeMetrics(String name, String sessionId, int[] intervals) {
+  JvmMetrics jvmMetrics = null;
+  
+  public DataNodeMetrics(String name, String sessionId, int[] intervals,
+      final JvmMetrics jvmMetrics) {
     this.name = name;
+    this.jvmMetrics = jvmMetrics;    
     registry.tag(SessionId, sessionId);
     
     final int len = intervals.length;
@@ -131,7 +133,7 @@ public class DataNodeMetrics {
   public static DataNodeMetrics create(Configuration conf, String dnName) {
     String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
     MetricsSystem ms = DefaultMetricsSystem.instance();
-    JvmMetrics.create("DataNode", sessionId, ms);
+    JvmMetrics jm = JvmMetrics.create("DataNode", sessionId, ms);
     String name = "DataNodeActivity-"+ (dnName.isEmpty()
         ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() 
             : dnName.replace(':', '-'));
@@ -141,11 +143,15 @@ public class DataNodeMetrics {
         conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
     
     return ms.register(name, null, new DataNodeMetrics(name, sessionId,
-        intervals));
+        intervals, jm));
   }
 
   public String name() { return name; }
 
+  public JvmMetrics getJvmMetrics() {
+    return jvmMetrics;
+  }
+  
   public void addHeartbeat(long latency) {
     heartbeats.add(latency);
   }

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Fri Jun 20 05:03:56 2014
@@ -252,7 +252,7 @@ class Checkpointer extends Daemon {
     
     backupNode.namesystem.writeLock();
     try {
-      backupNode.namesystem.dir.setReady();
+      backupNode.namesystem.setImageLoaded();
       if(backupNode.namesystem.getBlocksTotal() > 0) {
         backupNode.namesystem.setBlockTotal();
       }

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Jun 20 05:03:56 2014
@@ -26,11 +26,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
+import java.util.ListIterator;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -84,15 +83,14 @@ import com.google.common.annotations.Vis
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-/*************************************************
- * FSDirectory stores the filesystem directory state.
- * It handles writing/loading values to disk, and logging
- * changes as we go.
- *
- * It keeps the filename->blockset mapping always-current
- * and logged to disk.
- * 
- *************************************************/
+/**
+ * Both FSDirectory and FSNamesystem manage the state of the namespace.
+ * FSDirectory is a pure in-memory data structure, all of whose operations
+ * happen entirely in memory. In contrast, FSNamesystem persists the operations
+ * to the disk.
+ * @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem
+ **/
+@InterfaceAudience.Private
 public class FSDirectory implements Closeable {
   private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
     final INodeDirectory r = new INodeDirectory(
@@ -121,7 +119,6 @@ public class FSDirectory implements Clos
   INodeDirectory rootDir;
   FSImage fsImage;  
   private final FSNamesystem namesystem;
-  private volatile boolean ready = false;
   private volatile boolean skipQuotaCheck = false; //skip while consuming edits
   private final int maxComponentLength;
   private final int maxDirItems;
@@ -133,7 +130,6 @@ public class FSDirectory implements Clos
 
   // lock to protect the directory and BlockMap
   private final ReentrantReadWriteLock dirLock;
-  private final Condition cond;
 
   // utility methods to acquire and release read lock and write lock
   void readLock() {
@@ -176,7 +172,6 @@ public class FSDirectory implements Clos
 
   FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
     this.dirLock = new ReentrantReadWriteLock(true); // fair
-    this.cond = dirLock.writeLock().newCondition();
     rootDir = createRoot(ns);
     inodeMap = INodeMap.newInstance(rootDir);
     this.fsImage = fsImage;
@@ -233,38 +228,6 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Notify that loading of this FSDirectory is complete, and
-   * it is ready for use 
-   */
-  void imageLoadComplete() {
-    Preconditions.checkState(!ready, "FSDirectory already loaded");
-    setReady();
-  }
-
-  void setReady() {
-    if(ready) return;
-    writeLock();
-    try {
-      setReady(true);
-      this.nameCache.initialized();
-      cond.signalAll();
-    } finally {
-      writeUnlock();
-    }
-  }
-  
-  //This is for testing purposes only
-  @VisibleForTesting
-  boolean isReady() {
-    return ready;
-  }
-
-  // exposed for unit tests
-  protected void setReady(boolean flag) {
-    ready = flag;
-  }
-
-  /**
    * Shutdown the filestore
    */
   @Override
@@ -272,22 +235,12 @@ public class FSDirectory implements Clos
     fsImage.close();
   }
 
-  /**
-   * Block until the object is ready to be used.
-   */
-  void waitForReady() {
-    if (!ready) {
-      writeLock();
-      try {
-        while (!ready) {
-          try {
-            cond.await(5000, TimeUnit.MILLISECONDS);
-          } catch (InterruptedException ignored) {
-          }
-        }
-      } finally {
-        writeUnlock();
-      }
+  void markNameCacheInitialized() {
+    writeLock();
+    try {
+      nameCache.initialized();
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -313,7 +266,6 @@ public class FSDirectory implements Clos
       String clientMachine, DatanodeDescriptor clientNode)
     throws FileAlreadyExistsException, QuotaExceededException,
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
-    waitForReady();
 
     long modTime = now();
     INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
@@ -343,6 +295,7 @@ public class FSDirectory implements Clos
                             String path, 
                             PermissionStatus permissions,
                             List<AclEntry> aclEntries,
+                            List<XAttr> xAttrs,
                             short replication,
                             long modificationTime,
                             long atime,
@@ -369,6 +322,10 @@ public class FSDirectory implements Clos
           AclStorage.updateINodeAcl(newNode, aclEntries,
             Snapshot.CURRENT_STATE_ID);
         }
+        if (xAttrs != null) {
+          XAttrStorage.updateINodeXAttrs(newNode, xAttrs,
+              Snapshot.CURRENT_STATE_ID);
+        }
         return newNode;
       }
     } catch (IOException e) {
@@ -386,8 +343,6 @@ public class FSDirectory implements Clos
    */
   BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
       DatanodeStorageInfo[] targets) throws IOException {
-    waitForReady();
-
     writeLock();
     try {
       final INodeFile fileINode = inodesInPath.getLastINode().asFile();
@@ -425,8 +380,6 @@ public class FSDirectory implements Clos
   boolean removeBlock(String path, INodeFile fileNode, Block block)
       throws IOException {
     Preconditions.checkArgument(fileNode.isUnderConstruction());
-    waitForReady();
-
     writeLock();
     try {
       return unprotectedRemoveBlock(path, fileNode, block);
@@ -470,7 +423,6 @@ public class FSDirectory implements Clos
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
           +src+" to "+dst);
     }
-    waitForReady();
     writeLock();
     try {
       if (!unprotectedRenameTo(src, dst, mtime))
@@ -493,7 +445,6 @@ public class FSDirectory implements Clos
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src
           + " to " + dst);
     }
-    waitForReady();
     writeLock();
     try {
       if (unprotectedRenameTo(src, dst, mtime, options)) {
@@ -891,9 +842,10 @@ public class FSDirectory implements Clos
     
     boolean undoRemoveDst = false;
     INode removedDst = null;
+    long removedNum = 0;
     try {
       if (dstInode != null) { // dst exists remove it
-        if (removeLastINode(dstIIP) != -1) {
+        if ((removedNum = removeLastINode(dstIIP)) != -1) {
           removedDst = dstIIP.getLastINode();
           undoRemoveDst = true;
         }
@@ -933,13 +885,15 @@ public class FSDirectory implements Clos
         long filesDeleted = -1;
         if (removedDst != null) {
           undoRemoveDst = false;
-          BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-          List<INode> removedINodes = new ChunkedArrayList<INode>();
-          filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
-              dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes, true)
-              .get(Quota.NAMESPACE);
-          getFSNamesystem().removePathAndBlocks(src, collectedBlocks,
-              removedINodes);
+          if (removedNum > 0) {
+            BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+            List<INode> removedINodes = new ChunkedArrayList<INode>();
+            filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
+                dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
+                true).get(Quota.NAMESPACE);
+            getFSNamesystem().removePathAndBlocks(src, collectedBlocks,
+                removedINodes);
+          }
         }
 
         if (snapshottableDirs.size() > 0) {
@@ -1022,7 +976,6 @@ public class FSDirectory implements Clos
   Block[] setReplication(String src, short replication, short[] blockRepls)
       throws QuotaExceededException, UnresolvedLinkException,
       SnapshotAccessControlException {
-    waitForReady();
     writeLock();
     try {
       return unprotectedSetReplication(src, replication, blockRepls);
@@ -1145,7 +1098,6 @@ public class FSDirectory implements Clos
     writeLock();
     try {
       // actual move
-      waitForReady();
       unprotectedConcat(target, srcs, timestamp);
     } finally {
       writeUnlock();
@@ -1228,7 +1180,6 @@ public class FSDirectory implements Clos
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
-    waitForReady();
     final long filesRemoved;
     writeLock();
     try {
@@ -1701,7 +1652,7 @@ public class FSDirectory implements Clos
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     assert hasWriteLock();
-    if (!ready) {
+    if (!namesystem.isImageLoaded()) {
       //still initializing. do not check or update quotas.
       return;
     }
@@ -1894,7 +1845,7 @@ public class FSDirectory implements Clos
    */
   private void verifyQuotaForRename(INode[] src, INode[] dst)
       throws QuotaExceededException {
-    if (!ready || skipQuotaCheck) {
+    if (!namesystem.isImageLoaded() || skipQuotaCheck) {
       // Do not check quota if edits log is still being processed
       return;
     }
@@ -1950,7 +1901,7 @@ public class FSDirectory implements Clos
   void verifyINodeName(byte[] childName) throws HadoopIllegalArgumentException {
     if (Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, childName)) {
       String s = "\"" + HdfsConstants.DOT_SNAPSHOT_DIR + "\" is a reserved name.";
-      if (!ready) {
+      if (!namesystem.isImageLoaded()) {
         s += "  Please rename it before upgrade.";
       }
       throw new HadoopIllegalArgumentException(s);
@@ -1977,7 +1928,7 @@ public class FSDirectory implements Clos
           getFullPathName((INode[])parentPath, pos - 1): (String)parentPath;
       final PathComponentTooLongException e = new PathComponentTooLongException(
           maxComponentLength, length, p, DFSUtil.bytes2String(childName));
-      if (ready) {
+      if (namesystem.isImageLoaded()) {
         throw e;
       } else {
         // Do not throw if edits log is still being processed
@@ -2001,7 +1952,7 @@ public class FSDirectory implements Clos
     if (count >= maxDirItems) {
       final MaxDirectoryItemsExceededException e
           = new MaxDirectoryItemsExceededException(maxDirItems, count);
-      if (ready) {
+      if (namesystem.isImageLoaded()) {
         e.setPathName(getFullPathName(pathComponents, pos - 1));
         throw e;
       } else {
@@ -2337,7 +2288,6 @@ public class FSDirectory implements Clos
   void reset() {
     writeLock();
     try {
-      setReady(false);
       rootDir = createRoot(getFSNamesystem());
       inodeMap.clear();
       addToInodeMap(rootDir);
@@ -2619,101 +2569,171 @@ public class FSDirectory implements Clos
     }
   }
 
-  XAttr removeXAttr(String src, XAttr xAttr) throws IOException {
+  /**
+   * Removes a list of XAttrs from an inode at a path.
+   *
+   * @param src path of inode
+   * @param toRemove XAttrs to be removed
+   * @return List of XAttrs that were removed
+   * @throws IOException if the inode does not exist, if quota is exceeded
+   */
+  List<XAttr> removeXAttrs(final String src, final List<XAttr> toRemove)
+      throws IOException {
     writeLock();
     try {
-      return unprotectedRemoveXAttr(src, xAttr);
+      return unprotectedRemoveXAttrs(src, toRemove);
     } finally {
       writeUnlock();
     }
   }
-  
-  XAttr unprotectedRemoveXAttr(String src,
-      XAttr xAttr) throws IOException {
+
+  List<XAttr> unprotectedRemoveXAttrs(final String src,
+      final List<XAttr> toRemove) throws IOException {
     assert hasWriteLock();
     INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
     INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
-    List<XAttr> newXAttrs = filterINodeXAttr(existingXAttrs, xAttr);
+    List<XAttr> removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size());
+    List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
+        removedXAttrs);
     if (existingXAttrs.size() != newXAttrs.size()) {
       XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
-      return xAttr;
+      return removedXAttrs;
     }
     return null;
   }
-  
-  List<XAttr> filterINodeXAttr(List<XAttr> existingXAttrs, 
-      XAttr xAttr) throws QuotaExceededException {
-    if (existingXAttrs == null || existingXAttrs.isEmpty()) {
+
+  /**
+   * Filter XAttrs from a list of existing XAttrs. Removes matched XAttrs from
+   * toFilter and puts them into filtered. Upon completion,
+   * toFilter contains the filter XAttrs that were not found, while
+   * fitleredXAttrs contains the XAttrs that were found.
+   *
+   * @param existingXAttrs Existing XAttrs to be filtered
+   * @param toFilter XAttrs to filter from the existing XAttrs
+   * @param filtered Return parameter, XAttrs that were filtered
+   * @return List of XAttrs that does not contain filtered XAttrs
+   */
+  @VisibleForTesting
+  List<XAttr> filterINodeXAttrs(final List<XAttr> existingXAttrs,
+      final List<XAttr> toFilter, final List<XAttr> filtered) {
+    if (existingXAttrs == null || existingXAttrs.isEmpty() ||
+        toFilter == null || toFilter.isEmpty()) {
       return existingXAttrs;
     }
-    
-    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(existingXAttrs.size());
+
+    // Populate a new list with XAttrs that pass the filter
+    List<XAttr> newXAttrs =
+        Lists.newArrayListWithCapacity(existingXAttrs.size());
     for (XAttr a : existingXAttrs) {
-      if (!(a.getNameSpace() == xAttr.getNameSpace()
-          && a.getName().equals(xAttr.getName()))) {
-        xAttrs.add(a);
+      boolean add = true;
+      for (ListIterator<XAttr> it = toFilter.listIterator(); it.hasNext()
+          ;) {
+        XAttr filter = it.next();
+        if (a.equalsIgnoreValue(filter)) {
+          add = false;
+          it.remove();
+          filtered.add(filter);
+          break;
+        }
+      }
+      if (add) {
+        newXAttrs.add(a);
       }
     }
-    
-    return xAttrs;
+
+    return newXAttrs;
   }
   
-  void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
-          throws IOException {
+  void setXAttrs(final String src, final List<XAttr> xAttrs,
+      final EnumSet<XAttrSetFlag> flag) throws IOException {
     writeLock();
     try {
-      unprotectedSetXAttr(src, xAttr, flag);
+      unprotectedSetXAttrs(src, xAttrs, flag);
     } finally {
       writeUnlock();
     }
   }
   
-  void unprotectedSetXAttr(String src, XAttr xAttr, 
-      EnumSet<XAttrSetFlag> flag) throws IOException {
+  void unprotectedSetXAttrs(final String src, final List<XAttr> xAttrs,
+      final EnumSet<XAttrSetFlag> flag)
+      throws QuotaExceededException, IOException {
     assert hasWriteLock();
     INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
     INode inode = resolveLastINode(src, iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
-    List<XAttr> newXAttrs = setINodeXAttr(existingXAttrs, xAttr, flag);
+    List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, xAttrs, flag);
     XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
   }
-  
-  List<XAttr> setINodeXAttr(List<XAttr> existingXAttrs, XAttr xAttr, 
-      EnumSet<XAttrSetFlag> flag) throws QuotaExceededException, IOException {
-    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(
-        existingXAttrs != null ? existingXAttrs.size() + 1 : 1);
+
+  List<XAttr> setINodeXAttrs(final List<XAttr> existingXAttrs,
+      final List<XAttr> toSet, final EnumSet<XAttrSetFlag> flag)
+      throws IOException {
+    // Check for duplicate XAttrs in toSet
+    // We need to use a custom comparator, so using a HashSet is not suitable
+    for (int i = 0; i < toSet.size(); i++) {
+      for (int j = i + 1; j < toSet.size(); j++) {
+        if (toSet.get(i).equalsIgnoreValue(toSet.get(j))) {
+          throw new IOException("Cannot specify the same XAttr to be set " +
+              "more than once");
+        }
+      }
+    }
+
+    // Count the current number of user-visible XAttrs for limit checking
     int userVisibleXAttrsNum = 0; // Number of user visible xAttrs
-    boolean exist = false;
+
+    // The XAttr list is copied to an exactly-sized array when it's stored,
+    // so there's no need to size it precisely here.
+    int newSize = (existingXAttrs != null) ? existingXAttrs.size() : 0;
+    newSize += toSet.size();
+    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(newSize);
+
+    // Check if the XAttr already exists to validate with the provided flag
+    for (XAttr xAttr: toSet) {
+      boolean exist = false;
+      if (existingXAttrs != null) {
+        for (XAttr a : existingXAttrs) {
+          if (a.equalsIgnoreValue(xAttr)) {
+            exist = true;
+            break;
+          }
+        }
+      }
+      XAttrSetFlag.validate(xAttr.getName(), exist, flag);
+      // add the new XAttr since it passed validation
+      xAttrs.add(xAttr);
+      if (isUserVisible(xAttr)) {
+        userVisibleXAttrsNum++;
+      }
+    }
+
+    // Add the existing xattrs back in, if they weren't already set
     if (existingXAttrs != null) {
-      for (XAttr a: existingXAttrs) {
-        if ((a.getNameSpace() == xAttr.getNameSpace()
-            && a.getName().equals(xAttr.getName()))) {
-          exist = true;
-        } else {
-          xAttrs.add(a);
-          
-          if (isUserVisible(a)) {
+      for (XAttr existing : existingXAttrs) {
+        boolean alreadySet = false;
+        for (XAttr set : toSet) {
+          if (set.equalsIgnoreValue(existing)) {
+            alreadySet = true;
+            break;
+          }
+        }
+        if (!alreadySet) {
+          xAttrs.add(existing);
+          if (isUserVisible(existing)) {
             userVisibleXAttrsNum++;
           }
         }
       }
     }
-    
-    XAttrSetFlag.validate(xAttr.getName(), exist, flag);
-    xAttrs.add(xAttr);
-    
-    if (isUserVisible(xAttr)) {
-      userVisibleXAttrsNum++;
-    }
-    
+
     if (userVisibleXAttrsNum > inodeXAttrsLimit) {
       throw new IOException("Cannot add additional XAttr to inode, "
           + "would exceed limit of " + inodeXAttrsLimit);
     }
-    
+
     return xAttrs;
   }
   

Modified: hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1604086&r1=1604085&r2=1604086&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jun 20 05:03:56 2014
@@ -700,12 +700,19 @@ public class FSEditLog implements LogsPu
       .setBlocks(newNode.getBlocks())
       .setPermissionStatus(permissions)
       .setClientName(newNode.getFileUnderConstructionFeature().getClientName())
-      .setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine());
+      .setClientMachine(
+          newNode.getFileUnderConstructionFeature().getClientMachine());
 
     AclFeature f = newNode.getAclFeature();
     if (f != null) {
       op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
     }
+
+    XAttrFeature x = newNode.getXAttrFeature();
+    if (x != null) {
+      op.setXAttrs(x.getXAttrs());
+    }
+
     logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
@@ -761,6 +768,11 @@ public class FSEditLog implements LogsPu
     if (f != null) {
       op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
     }
+
+    XAttrFeature x = newNode.getXAttrFeature();
+    if (x != null) {
+      op.setXAttrs(x.getXAttrs());
+    }
     logEdit(op);
   }
   
@@ -1054,18 +1066,18 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
-  void logSetXAttr(String src, XAttr xAttr, boolean toLogRpcIds) {
+  void logSetXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
     final SetXAttrOp op = SetXAttrOp.getInstance();
     op.src = src;
-    op.xAttr = xAttr;
+    op.xAttrs = xAttrs;
     logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
-  void logRemoveXAttr(String src, XAttr xAttr) {
+  void logRemoveXAttrs(String src, List<XAttr> xAttrs) {
     final RemoveXAttrOp op = RemoveXAttrOp.getInstance();
     op.src = src;
-    op.xAttr = xAttr;
+    op.xAttrs = xAttrs;
     logEdit(op);
   }
 



Mime
View raw message