hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject svn commit: r1609878 [2/9] - in /hadoop/common/branches/YARN-1051/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs...
Date Sat, 12 Jul 2014 02:24:55 GMT
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Sat Jul 12 02:24:40 2014
@@ -95,6 +95,7 @@ class OpenFileCtx {
    */
   private AtomicLong nextOffset;
   private final HdfsDataOutputStream fos;
+  private final boolean aixCompatMode;
   
   // It's updated after each sync to HDFS
   private Nfs3FileAttributes latestAttr;
@@ -199,8 +200,15 @@ class OpenFileCtx {
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
       String dumpFilePath, DFSClient client, IdUserGroup iug) {
+    this(fos, latestAttr, dumpFilePath, client, iug, false);
+  }
+  
+  OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
+      String dumpFilePath, DFSClient client, IdUserGroup iug,
+      boolean aixCompatMode) {
     this.fos = fos;
     this.latestAttr = latestAttr;
+    this.aixCompatMode = aixCompatMode;
     // We use the ReverseComparatorOnMin as the comparator of the map. In this
     // way, we first dump the data with larger offset. In the meanwhile, we
     // retrieve the last element to write back to HDFS.
@@ -780,15 +788,29 @@ class OpenFileCtx {
     }
 
     if (commitOffset > 0) {
-      if (commitOffset > flushed) {
-        if (!fromRead) {
-          CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
-              preOpAttr);
-          pendingCommits.put(commitOffset, commitCtx);
+      if (aixCompatMode) {
+        // The AIX NFS client misinterprets RFC-1813 and will always send 4096
+        // for the commitOffset even if fewer bytes than that have ever (or will
+        // ever) be sent by the client. So, if in AIX compatibility mode, we
+        // will always DO_SYNC if the number of bytes to commit have already all
+        // been flushed, else we will fall through to the logic below which
+        // checks for pending writes in the case that we're being asked to
+        // commit more bytes than have so far been flushed. See HDFS-6549 for
+        // more info.
+        if (commitOffset <= flushed) {
+          return COMMIT_STATUS.COMMIT_DO_SYNC;
         }
-        return COMMIT_STATUS.COMMIT_WAIT;
       } else {
-        return COMMIT_STATUS.COMMIT_DO_SYNC;
+        if (commitOffset > flushed) {
+          if (!fromRead) {
+            CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+                preOpAttr);
+            pendingCommits.put(commitOffset, commitCtx);
+          }
+          return COMMIT_STATUS.COMMIT_WAIT;
+        } else {
+          return COMMIT_STATUS.COMMIT_DO_SYNC;
+        } 
       }
     }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Sat Jul 12 02:24:40 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
@@ -152,6 +153,7 @@ public class RpcProgramNfs3 extends RpcP
   private final short replication;
   private final long blockSize;
   private final int bufferSize;
+  private final boolean aixCompatMode;
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
   
@@ -169,8 +171,11 @@ public class RpcProgramNfs3 extends RpcP
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup(config);
     
+    aixCompatMode = config.getBoolean(
+        NfsConfigKeys.AIX_COMPAT_MODE_KEY,
+        NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
     exports = NfsExports.getInstance(config);
-    writeManager = new WriteManager(iug, config);
+    writeManager = new WriteManager(iug, config, aixCompatMode);
     clientCache = new DFSClientCache(config);
     replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
         DFSConfigKeys.DFS_REPLICATION_DEFAULT);
@@ -230,15 +235,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr, RpcInfo info) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -322,9 +327,9 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr, RpcInfo info) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -370,7 +375,7 @@ public class RpcProgramNfs3 extends RpcP
       }
       
       // check the write access privilege
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             preOpWcc, preOpAttr));
       }
@@ -398,15 +403,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr, RpcInfo info) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -460,15 +465,15 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public ACCESS3Response access(XDR xdr, RpcInfo info) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -499,7 +504,8 @@ public class RpcProgramNfs3 extends RpcP
         return new ACCESS3Response(Nfs3Status.NFS3ERR_STALE);
       }
       int access = Nfs3Utils.getAccessRightsForUserGroup(
-          securityHandler.getUid(), securityHandler.getGid(), attrs);
+          securityHandler.getUid(), securityHandler.getGid(),
+          securityHandler.getAuxGids(), attrs);
       
       return new ACCESS3Response(Nfs3Status.NFS3_OK, attrs, access);
     } catch (RemoteException r) {
@@ -519,15 +525,16 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  @Override
+  public READLINK3Response readlink(XDR xdr, RpcInfo info) {
     READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
 
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -591,12 +598,19 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public READ3Response read(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public READ3Response read(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return read(xdr, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  READ3Response read(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
     final String userName = securityHandler.getUser();
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
@@ -646,7 +660,8 @@ public class RpcProgramNfs3 extends RpcP
         return new READ3Response(Nfs3Status.NFS3ERR_NOENT);
       }
       int access = Nfs3Utils.getAccessRightsForUserGroup(
-          securityHandler.getUid(), securityHandler.getGid(), attrs);
+          securityHandler.getUid(), securityHandler.getGid(),
+          securityHandler.getAuxGids(), attrs);
       if ((access & Nfs3Constant.ACCESS3_READ) != 0) {
         eof = offset < attrs.getSize() ? false : true;
         return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof,
@@ -715,8 +730,17 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public WRITE3Response write(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client) {
+  public WRITE3Response write(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    RpcCall rpcCall = (RpcCall) info.header();
+    int xid = rpcCall.getXid();
+    SocketAddress remoteAddress = info.remoteAddress();
+    return write(xdr, info.channel(), xid, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  WRITE3Response write(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, SocketAddress remoteAddress) {
     WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
 
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
@@ -758,7 +782,7 @@ public class RpcProgramNfs3 extends RpcP
         return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
       }
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
             Nfs3Constant.WRITE_COMMIT_VERF);
@@ -791,8 +815,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public CREATE3Response create(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return create(xdr, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
@@ -838,7 +869,7 @@ public class RpcProgramNfs3 extends RpcP
         return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
       }
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
             preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
                 preOpDirAttr));
@@ -875,7 +906,8 @@ public class RpcProgramNfs3 extends RpcP
       
       // Add open stream
       OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
-          writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
+          writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
+          aixCompatMode);
       fileHandle = new FileHandle(postOpObjAttr.getFileId());
       if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
         LOG.warn("Can't add more stream, close it."
@@ -922,9 +954,9 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public MKDIR3Response mkdir(XDR xdr, RpcInfo info) {
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -960,7 +992,7 @@ public class RpcProgramNfs3 extends RpcP
         return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
 
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
             new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
       }
@@ -1012,15 +1044,15 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READDIR3Response mknod(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  @Override
+  public READDIR3Response mknod(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
   
   @Override
-  public REMOVE3Response remove(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public REMOVE3Response remove(XDR xdr, RpcInfo info) {
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1093,9 +1125,9 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public RMDIR3Response rmdir(XDR xdr, RpcInfo info) {
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1129,7 +1161,7 @@ public class RpcProgramNfs3 extends RpcP
       
       WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           preOpDirAttr);
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); 
       }
 
@@ -1175,9 +1207,9 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public RENAME3Response rename(XDR xdr, RpcInfo info) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1221,7 +1253,7 @@ public class RpcProgramNfs3 extends RpcP
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
       }
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
             fromPreOpAttr);
         WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
@@ -1263,15 +1295,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public SYMLINK3Response symlink(XDR xdr, RpcInfo info) {
     SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
 
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
 
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1322,8 +1354,8 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  @Override
+  public READDIR3Response link(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
@@ -1351,11 +1383,16 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
+  public READDIR3Response readdir(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return readdir(xdr, securityHandler, remoteAddress);
+  }
   public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+      SocketAddress remoteAddress) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
@@ -1408,9 +1445,24 @@ public class RpcProgramNfs3 extends RpcP
       }
       long cookieVerf = request.getCookieVerf();
       if ((cookieVerf != 0) && (cookieVerf != dirStatus.getModificationTime())) {
-        LOG.error("CookierVerf mismatch. request cookierVerf:" + cookieVerf
-            + " dir cookieVerf:" + dirStatus.getModificationTime());
-        return new READDIR3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        if (aixCompatMode) {
+          // The AIX NFS client misinterprets RFC-1813 and will repeatedly send
+          // the same cookieverf value even across VFS-level readdir calls,
+          // instead of getting a new cookieverf for every VFS-level readdir
+          // call, and reusing the cookieverf only in the event that multiple
+          // incremental NFS-level readdir calls must be made to fetch all of
+          // the directory entries. This means that whenever a readdir call is
+          // made by an AIX NFS client for a given directory, and that directory
+          // is subsequently modified, thus changing its mtime, no later readdir
+          // calls will succeed from AIX for that directory until the FS is
+          // unmounted/remounted. See HDFS-6549 for more info.
+          LOG.warn("AIX compatibility mode enabled, ignoring cookieverf " +
+              "mismatches.");
+        } else {
+          LOG.error("CookieVerf mismatch. request cookieVerf: " + cookieVerf
+              + " dir cookieVerf: " + dirStatus.getModificationTime());
+          return new READDIR3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        }
       }
 
       if (cookie == 0) {
@@ -1491,9 +1543,17 @@ public class RpcProgramNfs3 extends RpcP
         dirStatus.getModificationTime(), dirList);
   }
 
-  public READDIRPLUS3Response readdirplus(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+  @Override
+  public READDIRPLUS3Response readdirplus(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return readdirplus(xdr, securityHandler, remoteAddress);
+  }
+
+  @VisibleForTesting
+  READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
     }
     
@@ -1550,9 +1610,22 @@ public class RpcProgramNfs3 extends RpcP
       }
       long cookieVerf = request.getCookieVerf();
       if ((cookieVerf != 0) && (cookieVerf != dirStatus.getModificationTime())) {
-        LOG.error("CookierVerf mismatch. request cookierVerf:" + cookieVerf
-            + " dir cookieVerf:" + dirStatus.getModificationTime());
-        return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        if (aixCompatMode) {
+          // The AIX NFS client misinterprets RFC-1813 and will repeatedly send
+          // the same cookieverf value even across VFS-level readdir calls,
+          // instead of getting a new cookieverf for every VFS-level readdir
+          // call. This means that whenever a readdir call is made by an AIX NFS
+          // client for a given directory, and that directory is subsequently
+          // modified, thus changing its mtime, no later readdir calls will
+          // succeed for that directory from AIX until the FS is
+          // unmounted/remounted. See HDFS-6549 for more info.
+          LOG.warn("AIX compatibility mode enabled, ignoring cookieverf " +
+              "mismatches.");
+        } else {
+          LOG.error("cookieverf mismatch. request cookieverf: " + cookieVerf
+              + " dir cookieverf: " + dirStatus.getModificationTime());
+          return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_BAD_COOKIE);
+        }
       }
 
       if (cookie == 0) {
@@ -1643,15 +1716,15 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr, RpcInfo info) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1711,15 +1784,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr, RpcInfo info) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1769,15 +1842,15 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr, RpcInfo info) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
     }
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1816,9 +1889,11 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client) {
+  public COMMIT3Response commit(XDR xdr, RpcInfo info) {
+    //Channel channel, int xid,
+    //    SecurityHandler securityHandler, InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1849,7 +1924,7 @@ public class RpcProgramNfs3 extends RpcP
         return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
       }
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
             Nfs3Constant.WRITE_COMMIT_VERF);
@@ -1859,8 +1934,10 @@ public class RpcProgramNfs3 extends RpcP
           : (request.getOffset() + request.getCount());
       
       // Insert commit as an async request
-      writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
-          preOpAttr);
+      RpcCall rpcCall = (RpcCall) info.header();
+      int xid = rpcCall.getXid();
+      writeManager.handleCommit(dfsClient, handle, commitOffset,
+          info.channel(), xid, preOpAttr);
       return null;
     } catch (IOException e) {
       LOG.warn("Exception ", e);
@@ -1885,11 +1962,16 @@ public class RpcProgramNfs3 extends RpcP
       return null;
     }
   }
+
+  private SecurityHandler getSecurityHandler(RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
+    return getSecurityHandler(rpcCall.getCredential(), rpcCall.getVerifier());
+  }
   
   @Override
   public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
     RpcCall rpcCall = (RpcCall) info.header();
-    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
+    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());    
     int xid = rpcCall.getXid();
     byte[] data = new byte[info.data().readableBytes()];
     info.data().readBytes(data);
@@ -1897,9 +1979,8 @@ public class RpcProgramNfs3 extends RpcP
     XDR out = new XDR();
     InetAddress client = ((InetSocketAddress) info.remoteAddress())
         .getAddress();
-    Channel channel = info.channel();
-
     Credentials credentials = rpcCall.getCredential();
+    
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
       if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
@@ -1937,27 +2018,24 @@ public class RpcProgramNfs3 extends RpcP
       }
     }
     
-    SecurityHandler securityHandler = getSecurityHandler(credentials,
-        rpcCall.getVerifier());
-    
     NFS3Response response = null;
     if (nfsproc3 == NFSPROC3.NULL) {
       response = nullProcedure();
     } else if (nfsproc3 == NFSPROC3.GETATTR) {
-      response = getattr(xdr, securityHandler, client);
+      response = getattr(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SETATTR) {
-      response = setattr(xdr, securityHandler, client);
+      response = setattr(xdr, info);
     } else if (nfsproc3 == NFSPROC3.LOOKUP) {
-      response = lookup(xdr, securityHandler, client);
+      response = lookup(xdr, info);
     } else if (nfsproc3 == NFSPROC3.ACCESS) {
-      response = access(xdr, securityHandler, client);
+      response = access(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READLINK) {
-      response = readlink(xdr, securityHandler, client);
+      response = readlink(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READ) {
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.READ_RPC_START + xid);
       }    
-      response = read(xdr, securityHandler, client);
+      response = read(xdr, info);
       if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
         LOG.debug(Nfs3Utils.READ_RPC_END + xid);
       }
@@ -1965,36 +2043,36 @@ public class RpcProgramNfs3 extends RpcP
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
       }
-      response = write(xdr, channel, xid, securityHandler, client);
+      response = write(xdr, info);
       // Write end debug trace is in Nfs3Utils.writeChannel
     } else if (nfsproc3 == NFSPROC3.CREATE) {
-      response = create(xdr, securityHandler, client);
+      response = create(xdr, info);
     } else if (nfsproc3 == NFSPROC3.MKDIR) {      
-      response = mkdir(xdr, securityHandler, client);
+      response = mkdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
-      response = symlink(xdr, securityHandler, client);
+      response = symlink(xdr, info);
     } else if (nfsproc3 == NFSPROC3.MKNOD) {
-      response = mknod(xdr, securityHandler, client);
+      response = mknod(xdr, info);
     } else if (nfsproc3 == NFSPROC3.REMOVE) {
-      response = remove(xdr, securityHandler, client);
+      response = remove(xdr, info);
     } else if (nfsproc3 == NFSPROC3.RMDIR) {
-      response = rmdir(xdr, securityHandler, client);
+      response = rmdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.RENAME) {
-      response = rename(xdr, securityHandler, client);
+      response = rename(xdr, info);
     } else if (nfsproc3 == NFSPROC3.LINK) {
-      response = link(xdr, securityHandler, client);
+      response = link(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READDIR) {
-      response = readdir(xdr, securityHandler, client);
+      response = readdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
-      response = readdirplus(xdr, securityHandler, client);
+      response = readdirplus(xdr, info);
     } else if (nfsproc3 == NFSPROC3.FSSTAT) {
-      response = fsstat(xdr, securityHandler, client);
+      response = fsstat(xdr, info);
     } else if (nfsproc3 == NFSPROC3.FSINFO) {
-      response = fsinfo(xdr, securityHandler, client);
+      response = fsinfo(xdr, info);
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
-      response = pathconf(xdr, securityHandler, client);
+      response = pathconf(xdr,info);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, channel, xid, securityHandler, client);
+      response = commit(xdr, info);
     } else {
       // Invalid procedure
       RpcAcceptedReply.getInstance(xid,
@@ -2027,8 +2105,21 @@ public class RpcProgramNfs3 extends RpcP
     return nfsproc3 == null || nfsproc3.isIdempotent();
   }
   
-  private boolean checkAccessPrivilege(final InetAddress client,
+  private boolean checkAccessPrivilege(RpcInfo info,
+      final AccessPrivilege expected) {
+    SocketAddress remoteAddress = info.remoteAddress();
+    return checkAccessPrivilege(remoteAddress, expected);
+  }
+
+  private boolean checkAccessPrivilege(SocketAddress remoteAddress,
       final AccessPrivilege expected) {
+    // Port monitoring
+    if (!doPortMonitoring(remoteAddress)) {
+      return false;
+    }
+    
+    // Check export table
+    InetAddress client = ((InetSocketAddress) remoteAddress).getAddress();
     AccessPrivilege access = exports.getAccessPrivilege(client);
     if (access == AccessPrivilege.NONE) {
       return false;

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Sat Jul 12 02:24:40 2014
@@ -58,6 +58,7 @@ public class WriteManager {
   private boolean asyncDataServiceStarted = false;
 
   private final int maxStreams;
+  private final boolean aixCompatMode;
 
   /**
    * The time limit to wait for accumulate reordered sequential writes to the
@@ -79,9 +80,11 @@ public class WriteManager {
     return fileContextCache.put(h, ctx);
   }
   
-  WriteManager(IdUserGroup iug, final NfsConfiguration config) {
+  WriteManager(IdUserGroup iug, final NfsConfiguration config,
+      boolean aixCompatMode) {
     this.iug = iug;
     this.config = config;
+    this.aixCompatMode = aixCompatMode;
     streamTimeout = config.getLong(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY,
         NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
     LOG.info("Stream timeout is " + streamTimeout + "ms.");
@@ -175,7 +178,7 @@ public class WriteManager {
       String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
           NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
       openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
-          + fileHandle.getFileId(), dfsClient, iug);
+          + fileHandle.getFileId(), dfsClient, iug, aixCompatMode);
 
       if (!addOpenFileStream(fileHandle, openFileCtx)) {
         LOG.info("Can't add new stream. Close it. Tell client to retry.");

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Sat Jul 12 02:24:40 2014
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -190,6 +190,29 @@ public class TestWrites {
     ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
   }
+  
+  @Test
+  public void testCheckCommitAixCompatMode() throws IOException {
+    DFSClient dfsClient = Mockito.mock(DFSClient.class);
+    Nfs3FileAttributes attr = new Nfs3FileAttributes();
+    HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+
+    // Last argument "true" here to enable AIX compatibility mode.
+    OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
+        new IdUserGroup(new NfsConfiguration()), true);
+    
+    // Test fall-through to pendingWrites check in the event that commitOffset
+    // is greater than the number of bytes we've so far flushed.
+    Mockito.when(fos.getPos()).thenReturn((long) 2);
+    COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
+    Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED);
+    
+    // Test the case when we actually have received more bytes than we're trying
+    // to commit.
+    Mockito.when(fos.getPos()).thenReturn((long) 10);
+    status = ctx.checkCommitInternal(5, null, 1, attr, false);
+    Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
+  }
 
   @Test
   // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
@@ -207,7 +230,7 @@ public class TestWrites {
 
     FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
     COMMIT_STATUS ret;
-    WriteManager wm = new WriteManager(new IdUserGroup(config), config);
+    WriteManager wm = new WriteManager(new IdUserGroup(config), config, false);
     assertTrue(wm.addOpenFileStream(h, ctx));
     
     // Test inactive open file context
@@ -318,7 +341,7 @@ public class TestWrites {
       XDR createXdr = new XDR();
       createReq.serialize(createXdr);
       CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
       FileHandle handle = createRsp.getObjHandle();
 
       // Test DATA_SYNC
@@ -331,7 +354,7 @@ public class TestWrites {
       XDR writeXdr = new XDR();
       writeReq.serialize(writeXdr);
       nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
-          InetAddress.getLocalHost());
+          new InetSocketAddress("localhost", 1234));
 
       waitWrite(nfsd, handle, 60000);
 
@@ -340,7 +363,7 @@ public class TestWrites {
       XDR readXdr = new XDR();
       readReq.serialize(readXdr);
       READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
 
       assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
 
@@ -352,7 +375,7 @@ public class TestWrites {
       XDR createXdr2 = new XDR();
       createReq2.serialize(createXdr2);
       CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
       FileHandle handle2 = createRsp2.getObjHandle();
 
       WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
@@ -360,7 +383,7 @@ public class TestWrites {
       XDR writeXdr2 = new XDR();
       writeReq2.serialize(writeXdr2);
       nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
-          InetAddress.getLocalHost());
+          new InetSocketAddress("localhost", 1234));
 
       waitWrite(nfsd, handle2, 60000);
 
@@ -369,7 +392,7 @@ public class TestWrites {
       XDR readXdr2 = new XDR();
       readReq2.serialize(readXdr2);
       READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
 
       assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
       // FILE_SYNC should sync the file size

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Jul 12 02:24:40 2014
@@ -53,8 +53,6 @@ Trunk (Unreleased)
     HDFS-3030. Remove getProtocolVersion and getProtocolSignature from translators.
     (jitendra)
 
-    HDFS-2976. Remove unnecessary method (tokenRefetchNeeded) in DFSClient.
-
     HDFS-3111. Missing license headers in trunk. (umamahesh)
 
     HDFS-3091. Update the usage limitations of ReplaceDatanodeOnFailure policy in
@@ -95,8 +93,6 @@ Trunk (Unreleased)
     HDFS-3768. Exception in TestJettyHelper is incorrect. 
     (Eli Reisman via jghoman)
 
-    HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
-
     HDFS-2580. NameNode#main(...) can make use of GenericOptionsParser. (harsh)
 
     HDFS-2127. Add a test that ensure AccessControlExceptions contain
@@ -129,6 +125,9 @@ Trunk (Unreleased)
 
     HDFS-6252. Phase out the old web UI in HDFS. (wheat9)
 
+    HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable
+    directory. (Jing Zhao via wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -197,9 +196,6 @@ Trunk (Unreleased)
     HDFS-3834. Remove unused static fields NAME, DESCRIPTION and Usage from
     Command. (Jing Zhao via suresh)
 
-    HADOOP-8158. Interrupting hadoop fs -put from the command line
-    causes a LeaseExpiredException. (daryn via harsh)
-
     HDFS-2434. TestNameNodeMetrics.testCorruptBlock fails intermittently.
     (Jing Zhao via suresh)
 
@@ -254,8 +250,55 @@ Trunk (Unreleased)
     HDFS-5794. Fix the inconsistency of layout version number of 
     ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9)
 
-    HDFS-6375. Listing extended attributes with the search permission.
-    (Charles Lamb via wang)
+Release 2.6.0 - UNRELEASED
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+    HDFS-6613. Improve logging in caching classes. (wang)
+
+    HDFS-6511. BlockManager#computeInvalidateWork() could do nothing. (Juan Yu via wang)
+
+    HDFS-6638. Shorten test run time with a smaller retry timeout setting.
+    (Liang Xie via cnauroth)
+
+    HDFS-6627. Rename DataNode#checkWriteAccess to checkReadAccess.
+    (Liang Xie via cnauroth)
+
+    HDFS-6645. Add test for successive Snapshots between XAttr modifications.
+    (Stephen Chu via jing9)
+
+    HDFS-6643. Refactor INodeWithAdditionalFields.PermissionStatusFormat and
+    INodeFile.HeaderFormat. (szetszwo)
+
+    HDFS-6640. Syntax for MKDIRS, CREATESYMLINK, and SETXATTR are given wrongly
+    in WebHdfs document (missed webhdfs/v1). (Stephen Chu via jing9)
+
+    HDFS-5202. Support Centralized Cache Management on Windows. (cnauroth)
+
+    HDFS-2976. Remove unnecessary method (tokenRefetchNeeded) in DFSClient.
+    (Uma Maheswara Rao G)
+
+    HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+    HDFS-6617. Flake TestDFSZKFailoverController.testManualFailoverWithDFSHAAdmin
+    due to a long edit log sync op. (Liang Xie via cnauroth)
+
+    HDFS-6646. [ HDFS Rolling Upgrade - Shell ] shutdownDatanode and getDatanodeInfo
+    usage is missed ( Brahma Reddy Battula via vinayakumarb)
+
+    HDFS-6630. Unable to fetch the block information by Browsing the file system on
+    Namenode UI through IE9 ( Haohui Mai via vinayakumarb)
+
+    HADOOP-8158. Interrupting hadoop fs -put from the command line
+    causes a LeaseExpiredException. (daryn via harsh)
 
 Release 2.5.0 - UNRELEASED
 
@@ -452,6 +495,56 @@ Release 2.5.0 - UNRELEASED
     HDFS-6545. Finalizing rolling upgrade can make NN unavailable for a long
     duration. (kihwal)
 
+    HDFS-6530. Fix Balancer documentation.  (szetszwo)
+
+    HDFS-6480. Move waitForReady() from FSDirectory to FSNamesystem. (wheat9)
+
+    HDFS-6403. Add metrics for log warnings reported by JVM pauses. (Yongjun
+    Zhang via atm)
+
+    HDFS-6557. Move the reference of fsimage to FSNamesystem. (wheat9)
+
+    HDFS-4667. Capture renamed files/directories in snapshot diff report. (jing9
+    and Binglin Chang via jing9)
+
+    HDFS-6507. Improve DFSAdmin to support HA cluster better.
+    (Zesheng Wu via vinayakumarb)
+
+    HDFS-6578. add toString method to DatanodeStorage for easier debugging.
+    (Yongjun Zhang via Arpit Agarwal)
+
+    HDFS-6562. Refactor rename() in FSDirectory. (wheat9)
+
+    HDFS-6486. Add user doc for XAttrs via WebHDFS. (Yi Liu via umamahesh)
+
+    HDFS-6430. HTTPFS - Implement XAttr support. (Yi Liu via tucu)
+
+    HDFS-6593. Move SnapshotDiffInfo out of INodeDirectorySnapshottable.
+    (Jing Zhao via wheat9)
+
+    HDFS-6595. Allow the maximum threads for balancing on datanodes to be
+    configurable. (Benoy Antony via szetszwo)
+
+    HDFS-6572. Add an option to the NameNode that prints the software and
+    on-disk image versions. (Charles Lamb via cnauroth)
+
+    HDFS-6603. Add XAttr with ACL test. (Stephen Chu via cnauroth)
+
+    HDFS-6612. MiniDFSNNTopology#simpleFederatedTopology(int)
+    always hardcode nameservice ID. (Juan Yu via wang)
+
+    HDFS-6614. shorten TestPread run time with a smaller retry timeout setting.
+    (Liang Xie via cnauroth)
+
+    HDFS-6610. TestShortCircuitLocalRead tests sometimes timeout on slow
+    machines. (Charles Lamb via wang)
+
+    HDFS-6620. Snapshot docs should specify about preserve options with cp command
+    (Stephen Chu via umamahesh)
+
+    HDFS-6493. Change dfs.namenode.startup.delay.block.deletion to second
+    instead of millisecond. (Juan Yu via wang)
+
   OPTIMIZATIONS
 
     HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
@@ -459,6 +552,8 @@ Release 2.5.0 - UNRELEASED
     HDFS-6460. Ignore stale and decommissioned nodes in
     NetworkTopology#sortByDistance. (Yongjun Zhang via wang)
 
+    HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9)
+
   BUG FIXES 
 
     HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.
@@ -651,6 +746,82 @@ Release 2.5.0 - UNRELEASED
     HDFS-6527. Edit log corruption due to defered INode removal. (kihwal and
     jing9 via jing9)
 
+    HDFS-6552. add DN storage to a BlockInfo will not replace the different
+    storage from same DN. (Amir Langer via Arpit Agarwal)
+
+    HDFS-6551. Rename with OVERWRITE option may throw NPE when the target
+    file/directory is a reference INode. (jing9)
+
+    HDFS-6439. NFS should not reject NFS requests to the NULL procedure whether
+    port monitoring is enabled or not. (brandonli)
+
+    HDFS-6559. Fix wrong option "dfsadmin -rollingUpgrade start" in the
+    document. (Akira Ajisaka via Arpit Agarwal)
+
+    HDFS-6553. Add missing DeprecationDeltas for NFS Kerberos configurations
+    (Stephen Chu via brandonli)
+
+    HDFS-6563. NameNode cannot save fsimage in certain circumstances when
+    snapshots are in use. (atm)
+
+    HDFS-3848. A Bug in recoverLeaseInternal method of FSNameSystem class
+    (Hooman Peiro Sajjad  and Chen He via kihwal)
+
+    HDFS-6549. Add support for accessing the NFS gateway from the AIX NFS
+    client. (atm)
+
+    HDFS-6535. HDFS quota update is wrong when file is appended. (George Wong
+    via jing9)
+
+    HDFS-6222. Remove background token renewer from webhdfs.
+    (Rushabh Shah and Daryn Sharp via cnauroth)
+
+    HDFS-6580. FSNamesystem.mkdirsInt should call the getAuditFileInfo()
+    wrapper. (Zhilei Xu via wheat9)
+
+    HDFS-6587. Bug in TestBPOfferService can cause test failure. (Zhilei Xu
+    via Arpit Agarwal)
+
+    HDFS-6598. Fix a typo in message issued from explorer.js. (Yongjun Zhang
+    via wheat9)
+
+    HDFS-6475. WebHdfs clients fail without retry because incorrect handling
+    of StandbyException. (Yongjun Zhang via atm)
+
+    HADOOP-10701. NFS should not validate the access premission only based on
+    the user's primary group (Harsh J via atm)
+
+    HDFS-6556. Refine XAttr permissions (umamahesh)
+
+    HDFS-6601. Issues in finalizing rolling upgrade when there is a layout 
+    version change (kihwal)
+
+    HDFS-6418. Regression: DFS_NAMENODE_USER_NAME_KEY missing
+    (szetszwo via stevel)
+
+    HDFS-6558. Missing newline in the description of dfsadmin -rollingUpgrade.
+    (Chen He via kihwal)
+
+    HDFS-6591. while loop is executed tens of thousands of times in Hedged Read
+    (Liang Xie via cnauroth)
+
+    HDFS-6604. The short-circuit cache doesn't correctly time out replicas that
+    haven't been used in a while (cmccabe)
+
+    HDFS-4286. Changes from BOOKKEEPER-203 broken capability of including 
+    bookkeeper-server jar in hidden package of BKJM (Rakesh R via umamahesh)
+
+    HDFS-4221. Remove the format limitation point from BKJM documentation as HDFS-3810
+    closed. (Rakesh R via umamahesh)
+
+    HDFS-5411. Update Bookkeeper dependency to 4.2.3. (Rakesh R via umamahesh)
+
+    HDFS-6631. TestPread#testHedgedReadLoopTooManyTimes fails intermittently.
+    (Liang Xie via cnauroth)
+
+    HDFS-6647. Edit log corruption when pipeline recovery occurs for deleted 
+    file present in snapshot (kihwal)
+
   BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
 
     HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
@@ -714,6 +885,21 @@ Release 2.5.0 - UNRELEASED
     HDFS-6374. setXAttr should require the user to be the owner of the file
     or directory (Charles Lamb via wang)
 
+    HDFS-6375. Listing extended attributes with the search permission.
+    (Charles Lamb via wang)
+
+    HDFS-6492. Support create-time xattrs and atomically setting multiple
+    xattrs. (wang)
+
+    HDFS-6312. WebHdfs HA failover is broken on secure clusters. 
+    (daryn via tucu)
+
+    HDFS-6618. FSNamesystem#delete drops the FSN lock between removing INodes
+    from the tree and deleting them from the inode map (kihwal via cmccabe)
+
+    HDFS-6622. Rename and AddBlock may race and produce invalid edits (kihwal
+    via cmccabe)
+
 Release 2.4.1 - 2014-06-23 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml Sat Jul 12 02:24:40 2014
@@ -163,38 +163,24 @@ http://maven.apache.org/xsd/maven-4.0.0.
       <build>
         <plugins>
           <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-shade-plugin</artifactId>
-            <version>1.5</version>
+            <artifactId>maven-dependency-plugin</artifactId>
+            <version>2.8</version>
             <executions>
               <execution>
+                <id>dist</id>
                 <phase>package</phase>
                 <goals>
-                  <goal>shade</goal>
+                  <goal>copy</goal>
                 </goals>
                 <configuration>
-                  <createDependencyReducedPom>false</createDependencyReducedPom>
-                  <artifactSet>
-                    <includes>
-                      <include>org.apache.bookkeeper:bookkeeper-server</include>
-                      <include>org.apache.zookeeper:zookeeper</include>
-                      <include>org.jboss.netty:netty</include>
-                    </includes>
-                  </artifactSet>
-                <relocations>
-                  <relocation>
-                    <pattern>org.apache.bookkeeper</pattern>
-                    <shadedPattern>hidden.bkjournal.org.apache.bookkeeper</shadedPattern>
-                  </relocation>
-                  <relocation>
-                    <pattern>org.apache.zookeeper</pattern>
-                    <shadedPattern>hidden.bkjournal.org.apache.zookeeper</shadedPattern>
-                  </relocation>
-                  <relocation>
-                    <pattern>org.jboss.netty</pattern>
-                    <shadedPattern>hidden.bkjournal.org.jboss.netty</shadedPattern>
-                  </relocation>
-                </relocations>
+                  <artifactItems>
+                    <artifactItem>
+                      <groupId>org.apache.bookkeeper</groupId>
+                      <artifactId>bookkeeper-server</artifactId>
+                      <type>jar</type>
+                    </artifactItem>
+                  </artifactItems>
+                  <outputDirectory>${project.build.directory}/lib</outputDirectory>
                 </configuration>
               </execution>
             </executions>

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java Sat Jul 12 02:24:40 2014
@@ -237,7 +237,7 @@ public class BookKeeperJournalManager im
         zkPathLatch.countDown();
       }
     };
-    ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
+    ZkUtils.asyncCreateFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
 
     try {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java Sat Jul 12 02:24:40 2014
@@ -149,13 +149,16 @@ class BKJMUtil {
   int checkBookiesUp(int count, int timeout) throws Exception {
     ZooKeeper zkc = connectZooKeeper();
     try {
-      boolean up = false;
       int mostRecentSize = 0;
       for (int i = 0; i < timeout; i++) {
         try {
           List<String> children = zkc.getChildren("/ledgers/available",
                                                   false);
           mostRecentSize = children.size();
+          // Skip 'readonly znode' which is used for keeping R-O bookie details
+          if (children.contains("readonly")) {
+            mostRecentSize = children.size() - 1;
+          }
           if (LOG.isDebugEnabled()) {
             LOG.debug("Found " + mostRecentSize + " bookies up, "
                       + "waiting for " + count);
@@ -166,7 +169,6 @@ class BKJMUtil {
             }
           }
           if (mostRecentSize == count) {
-            up = true;
             break;
           }
         } catch (KeeperException e) {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd Sat Jul 12 02:24:40 2014
@@ -47,7 +47,7 @@ if "%1" == "--config" (
       goto print_usage
   )
 
-  set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir
+  set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin
   for %%i in ( %hdfscommands% ) do (
     if %hdfs-command% == %%i set hdfscommand=true
   )
@@ -146,6 +146,10 @@ goto :eof
   set CLASS=org.apache.hadoop.hdfs.tools.snapshot.LsSnapshottableDir
   goto :eof
 
+:cacheadmin
+  set CLASS=org.apache.hadoop.hdfs.tools.CacheAdmin
+  goto :eof
+
 @rem This changes %1, %2 etc. Hence those cannot be used after calling this.
 :make_command_arguments
   if "%1" == "--config" (
@@ -193,6 +197,7 @@ goto :eof
   @echo                        current directory contents with a snapshot
   @echo   lsSnapshottableDir   list all snapshottable dirs owned by the current user
   @echo 						Use -help to see options
+  @echo   cacheadmin           configure the HDFS cache
   @echo.
   @echo Most commands print help when invoked w/o parameters.
 

Propchange: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1606534
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1603348-1609877

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java Sat Jul 12 02:24:40 2014
@@ -19,6 +19,8 @@ package org.apache.hadoop.fs;
 
 import java.util.Arrays;
 
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
@@ -28,11 +30,11 @@ import org.apache.hadoop.classification.
  * namespaces are defined: user, trusted, security and system.
  *   1) USER namespace attributes may be used by any user to store
  *   arbitrary information. Access permissions in this namespace are
- *   defined by a file directory's permission bits.
+ *   defined by a file directory's permission bits. For sticky directories,
+ *   only the owner and privileged user can write attributes.
  * <br>
  *   2) TRUSTED namespace attributes are only visible and accessible to
- *   privileged users (a file or directory's owner or the fs
- *   admin). This namespace is available from both user space
+ *   privileged users. This namespace is available from both user space
  *   (filesystem API) and fs kernel.
  * <br>
  *   3) SYSTEM namespace attributes are used by the fs kernel to store
@@ -105,42 +107,47 @@ public class XAttr {
   
   @Override
   public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((name == null) ? 0 : name.hashCode());
-    result = prime * result + ((ns == null) ? 0 : ns.hashCode());
-    result = prime * result + Arrays.hashCode(value);
-    return result;
+    return new HashCodeBuilder(811, 67)
+        .append(name)
+        .append(ns)
+        .append(value)
+        .toHashCode();
   }
 
   @Override
   public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
+    if (obj == null) { return false; }
+    if (obj == this) { return true; }
+    if (obj.getClass() != getClass()) {
       return false;
     }
-    XAttr other = (XAttr) obj;
-    if (name == null) {
-      if (other.name != null) {
-        return false;
-      }
-    } else if (!name.equals(other.name)) {
-      return false;
-    }
-    if (ns != other.ns) {
-      return false;
-    }
-    if (!Arrays.equals(value, other.value)) {
+    XAttr rhs = (XAttr) obj;
+    return new EqualsBuilder()
+        .append(ns, rhs.ns)
+        .append(name, rhs.name)
+        .append(value, rhs.value)
+        .isEquals();
+  }
+
+  /**
+   * Similar to {@link #equals(Object)}, except ignores the XAttr value.
+   *
+   * @param obj to compare equality
+   * @return if the XAttrs are equal, ignoring the XAttr value
+   */
+  public boolean equalsIgnoreValue(Object obj) {
+    if (obj == null) { return false; }
+    if (obj == this) { return true; }
+    if (obj.getClass() != getClass()) {
       return false;
     }
-    return true;
+    XAttr rhs = (XAttr) obj;
+    return new EqualsBuilder()
+        .append(ns, rhs.ns)
+        .append(name, rhs.name)
+        .isEquals();
   }
-  
+
   @Override
   public String toString() {
     return "XAttr [ns=" + ns + ", name=" + name + ", value="

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java Sat Jul 12 02:24:40 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -29,6 +31,7 @@ import org.apache.hadoop.classification.
 @InterfaceAudience.Private
 public class DFSClientFaultInjector {
   public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
+  public static AtomicLong exceptionNum = new AtomicLong(0);
 
   public static DFSClientFaultInjector get() {
     return instance;
@@ -47,4 +50,8 @@ public class DFSClientFaultInjector {
   }
 
   public void startFetchFromDatanode() {}
+
+  public void fetchFromDatanodeException() {}
+
+  public void readFromDatanodeDelay() {}
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Sat Jul 12 02:24:40 2014
@@ -105,6 +105,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
   public static final String  DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
   public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
+  public static final String  DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves";
+  public static final int     DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5;
   public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
   public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
   public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
@@ -250,8 +252,8 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 30000L;
 
   /** Pending period of block deletion since NameNode startup */
-  public static final String  DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY = "dfs.namenode.startup.delay.block.deletion.ms";
-  public static final long    DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT = 0L;
+  public static final String  DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY = "dfs.namenode.startup.delay.block.deletion.sec";
+  public static final long    DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT = 0L;
 
   // Whether to enable datanode's stale state detection and usage for reads
   public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
@@ -494,16 +496,26 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup";
   public static final String  DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file";
   public static final String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal";
+  @Deprecated
+  public static final String  DFS_DATANODE_USER_NAME_KEY = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
   public static final String  DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths";
   public static final String  DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT = "/dev/shm,/tmp";
   public static final String  DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
   public static final int     DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000;
   public static final String  DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
   public static final String  DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = "dfs.namenode.kerberos.principal";
+  @Deprecated
+  public static final String  DFS_NAMENODE_USER_NAME_KEY = DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
   public static final String  DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.namenode.kerberos.internal.spnego.principal";
+  @Deprecated
+  public static final String  DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
   public static final String  DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY = "dfs.secondary.namenode.keytab.file";
   public static final String  DFS_SECONDARY_NAMENODE_KERBEROS_PRINCIPAL_KEY = "dfs.secondary.namenode.kerberos.principal";
+  @Deprecated
+  public static final String  DFS_SECONDARY_NAMENODE_USER_NAME_KEY = DFS_SECONDARY_NAMENODE_KERBEROS_PRINCIPAL_KEY;
   public static final String  DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.secondary.namenode.kerberos.internal.spnego.principal";
+  @Deprecated
+  public static final String  DFS_SECONDARY_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
   public static final String  DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY = "dfs.namenode.legacy-oiv-image.dir";

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Sat Jul 12 02:24:40 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -32,12 +33,14 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -81,6 +84,7 @@ implements ByteBufferReadable, CanSetDro
     HasEnhancedByteBufferAccess {
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
+  private long hedgedReadOpsLoopNumForTesting = 0;
   private final DFSClient dfsClient;
   private boolean closed = false;
   private final String src;
@@ -976,20 +980,15 @@ implements ByteBufferReadable, CanSetDro
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
       final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
-      final CountDownLatch latch) {
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     return new Callable<ByteBuffer>() {
       @Override
       public ByteBuffer call() throws Exception {
-        try {
-          byte[] buf = bb.array();
-          int offset = bb.position();
-          actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
-              corruptedBlockMap);
-          return bb;
-        } finally {
-          latch.countDown();
-        }
+        byte[] buf = bb.array();
+        int offset = bb.position();
+        actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+            corruptedBlockMap);
+        return bb;
       }
     };
   }
@@ -1018,6 +1017,7 @@ implements ByteBufferReadable, CanSetDro
       BlockReader reader = null;
 
       try {
+        DFSClientFaultInjector.get().fetchFromDatanodeException();
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
         int len = (int) (end - start + 1);
         reader = new BlockReaderFactory(dfsClient.getConf()).
@@ -1044,6 +1044,7 @@ implements ByteBufferReadable, CanSetDro
           throw new IOException("truncated return from reader.read(): " +
                                 "excpected " + len + ", got " + nread);
         }
+        DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
       } catch (ChecksumException e) {
         String msg = "fetchBlockByteRange(). Got a checksum exception for "
@@ -1097,35 +1098,43 @@ implements ByteBufferReadable, CanSetDro
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
+    CompletionService<ByteBuffer> hedgedService =
+        new ExecutorCompletionService<ByteBuffer>(
+        dfsClient.getHedgedReadsThreadPool());
     ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
     ByteBuffer bb = null;
     int len = (int) (end - start + 1);
     block = getBlockAt(block.getStartOffset(), false);
-    // Latch shared by all outstanding reads.  First to finish closes
-    CountDownLatch hasReceivedResult = new CountDownLatch(1);
     while (true) {
+      // see HDFS-6591, this metric is used to verify/catch unnecessary loops
+      hedgedReadOpsLoopNumForTesting++;
       DNAddrPair chosenNode = null;
-      Future<ByteBuffer> future = null;
-      // futures is null if there is no request already executing.
+      // there is no request already executing.
       if (futures.isEmpty()) {
-        // chooseDataNode is a commitment.  If no node, we go to
-        // the NN to reget block locations.  Only go here on first read.
+        // chooseDataNode is a commitment. If no node, we go to
+        // the NN to reget block locations. Only go here on first read.
         chosenNode = chooseDataNode(block, ignored);
         bb = ByteBuffer.wrap(buf, offset, len);
-        future = getHedgedReadFuture(chosenNode, block, start, end, bb,
-          corruptedBlockMap, hasReceivedResult);
+        Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+            chosenNode, block, start, end, bb, corruptedBlockMap);
+        Future<ByteBuffer> firstRequest = hedgedService
+            .submit(getFromDataNodeCallable);
+        futures.add(firstRequest);
         try {
-          future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
-          return;
-        } catch (TimeoutException e) {
+          Future<ByteBuffer> future = hedgedService.poll(
+              dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
+          if (future != null) {
+            future.get();
+            return;
+          }
           if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() +
-              "ms to read from " + chosenNode.info + "; spawning hedged read");
+            DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
+                + "ms to read from " + chosenNode.info
+                + "; spawning hedged read");
           }
           // Ignore this node on next go around.
           ignored.add(chosenNode.info);
           dfsClient.getHedgedReadMetrics().incHedgedReadOps();
-          futures.add(future);
           continue; // no need to refresh block locations
         } catch (InterruptedException e) {
           // Ignore
@@ -1133,25 +1142,31 @@ implements ByteBufferReadable, CanSetDro
           // Ignore already logged in the call.
         }
       } else {
-        // We are starting up a 'hedged' read.  We have a read already
+        // We are starting up a 'hedged' read. We have a read already
         // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
         // If no nodes to do hedged reads against, pass.
         try {
-          chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+          try {
+            chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+          } catch (IOException ioe) {
+            chosenNode = chooseDataNode(block, ignored);
+          }
           bb = ByteBuffer.allocate(len);
-          future = getHedgedReadFuture(chosenNode, block, start, end, bb,
-            corruptedBlockMap, hasReceivedResult);
-          futures.add(future);
+          Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+              chosenNode, block, start, end, bb, corruptedBlockMap);
+          Future<ByteBuffer> oneMoreRequest = hedgedService
+              .submit(getFromDataNodeCallable);
+          futures.add(oneMoreRequest);
         } catch (IOException ioe) {
           if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Failed getting node for hedged read: " +
-              ioe.getMessage());
+            DFSClient.LOG.debug("Failed getting node for hedged read: "
+                + ioe.getMessage());
           }
         }
         // if not succeeded. Submit callables for each datanode in a loop, wait
         // for a fixed interval and get the result from the fastest one.
         try {
-          ByteBuffer result = getFirstToComplete(futures, hasReceivedResult);
+          ByteBuffer result = getFirstToComplete(hedgedService, futures);
           // cancel the rest.
           cancelAll(futures);
           if (result.array() != buf) { // compare the array pointers
@@ -1163,50 +1178,43 @@ implements ByteBufferReadable, CanSetDro
           }
           return;
         } catch (InterruptedException ie) {
-          // Ignore
-        } catch (ExecutionException e) {
-          // exception already handled in the call method. getFirstToComplete
-          // will remove the failing future from the list. nothing more to do.
+          // Ignore and retry
         }
-        // We got here if exception.  Ignore this node on next go around IFF
+        // We got here if exception. Ignore this node on next go around IFF
         // we found a chosenNode to hedge read against.
         if (chosenNode != null && chosenNode.info != null) {
           ignored.add(chosenNode.info);
         }
       }
-      // executed if we get an error from a data node
-      block = getBlockAt(block.getStartOffset(), false);
     }
   }
 
-  private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode,
-      final LocatedBlock block, long start,
-      final long end, final ByteBuffer bb,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
-      final CountDownLatch hasReceivedResult) {
-    Callable<ByteBuffer> getFromDataNodeCallable =
-        getFromOneDataNode(chosenNode, block, start, end, bb,
-          corruptedBlockMap, hasReceivedResult);
-    return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable);
+  @VisibleForTesting
+  public long getHedgedReadOpsLoopNumForTesting() {
+    return hedgedReadOpsLoopNumForTesting;
   }
 
-  private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures,
-      CountDownLatch latch) throws ExecutionException, InterruptedException {
-    latch.await();
-    for (Future<ByteBuffer> future : futures) {
-      if (future.isDone()) {
-        try {
-          return future.get();
-        } catch (ExecutionException e) {
-          // already logged in the Callable
-          futures.remove(future);
-          throw e;
-        }
-      }
+  private ByteBuffer getFirstToComplete(
+      CompletionService<ByteBuffer> hedgedService,
+      ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
+    if (futures.isEmpty()) {
+      throw new InterruptedException("let's retry");
+    }
+    Future<ByteBuffer> future = null;
+    try {
+      future = hedgedService.take();
+      ByteBuffer bb = future.get();
+      futures.remove(future);
+      return bb;
+    } catch (ExecutionException e) {
+      // already logged in the Callable
+      futures.remove(future);
+    } catch (CancellationException ce) {
+      // already logged in the Callable
+      futures.remove(future);
     }
-    throw new InterruptedException("latch has counted down to zero but no"
-        + "result available yet, for safety try to request another one from"
-        + "outside loop, this should be rare");
+
+    throw new InterruptedException("let's retry");
   }
 
   private void cancelAll(List<Future<ByteBuffer>> futures) {
@@ -1367,10 +1375,10 @@ implements ByteBufferReadable, CanSetDro
   @Override
   public synchronized void seek(long targetPos) throws IOException {
     if (targetPos > getFileLength()) {
-      throw new IOException("Cannot seek after EOF");
+      throw new EOFException("Cannot seek after EOF");
     }
     if (targetPos < 0) {
-      throw new IOException("Cannot seek to negative offset");
+      throw new EOFException("Cannot seek to negative offset");
     }
     if (closed) {
       throw new IOException("Stream is closed!");

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Sat Jul 12 02:24:40 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -353,18 +354,42 @@ public class HAUtil {
    */
   public static List<ClientProtocol> getProxiesForAllNameNodesInNameservice(
       Configuration conf, String nsId) throws IOException {
+    List<ProxyAndInfo<ClientProtocol>> proxies =
+        getProxiesForAllNameNodesInNameservice(conf, nsId, ClientProtocol.class);
+
+    List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>(
+        proxies.size());
+    for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
+      namenodes.add(proxy.getProxy());
+    }
+    return namenodes;
+  }
+
+  /**
+   * Get an RPC proxy for each NN in an HA nameservice. Used when a given RPC
+   * call should be made on every NN in an HA nameservice, not just the active.
+   *
+   * @param conf configuration
+   * @param nsId the nameservice to get all of the proxies for.
+   * @param xface the protocol class.
+   * @return a list of RPC proxies for each NN in the nameservice.
+   * @throws IOException in the event of error.
+   */
+  public static <T> List<ProxyAndInfo<T>> getProxiesForAllNameNodesInNameservice(
+      Configuration conf, String nsId, Class<T> xface) throws IOException {
     Map<String, InetSocketAddress> nnAddresses =
         DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
     
-    List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>();
+    List<ProxyAndInfo<T>> proxies = new ArrayList<ProxyAndInfo<T>>(
+        nnAddresses.size());
     for (InetSocketAddress nnAddress : nnAddresses.values()) {
-      NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
+      NameNodeProxies.ProxyAndInfo<T> proxyInfo = null;
       proxyInfo = NameNodeProxies.createNonHAProxy(conf,
-          nnAddress, ClientProtocol.class,
+          nnAddress, xface,
           UserGroupInformation.getCurrentUser(), false);
-      namenodes.add(proxyInfo.getProxy());
+      proxies.add(proxyInfo);
     }
-    return namenodes;
+    return proxies;
   }
   
   /**

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Sat Jul 12 02:24:40 2014
@@ -106,10 +106,13 @@ public class NameNodeProxies {
   public static class ProxyAndInfo<PROXYTYPE> {
     private final PROXYTYPE proxy;
     private final Text dtService;
+    private final InetSocketAddress address;
     
-    public ProxyAndInfo(PROXYTYPE proxy, Text dtService) {
+    public ProxyAndInfo(PROXYTYPE proxy, Text dtService,
+        InetSocketAddress address) {
       this.proxy = proxy;
       this.dtService = dtService;
+      this.address = address;
     }
     
     public PROXYTYPE getProxy() {
@@ -119,6 +122,10 @@ public class NameNodeProxies {
     public Text getDelegationTokenService() {
       return dtService;
     }
+
+    public InetSocketAddress getAddress() {
+      return address;
+    }
   }
 
   /**
@@ -161,7 +168,8 @@ public class NameNodeProxies {
         dtService = SecurityUtil.buildTokenService(
             NameNode.getAddress(nameNodeUri));
       }
-      return new ProxyAndInfo<T>(proxy, dtService);
+      return new ProxyAndInfo<T>(proxy, dtService,
+          NameNode.getAddress(nameNodeUri));
     }
   }
   
@@ -221,7 +229,8 @@ public class NameNodeProxies {
         dtService = SecurityUtil.buildTokenService(
             NameNode.getAddress(nameNodeUri));
       }
-      return new ProxyAndInfo<T>(proxy, dtService);
+      return new ProxyAndInfo<T>(proxy, dtService,
+          NameNode.getAddress(nameNodeUri));
     } else {
       LOG.warn("Currently creating proxy using " +
       		"LossyRetryInvocationHandler requires NN HA setup");
@@ -274,7 +283,7 @@ public class NameNodeProxies {
       throw new IllegalStateException(message);
     }
 
-    return new ProxyAndInfo<T>(proxy, dtService);
+    return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
   }
   
   private static JournalProtocol createNNProxyWithJournalProtocol(



Mime
View raw message