hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1419193 [2/3] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/wsrs/ hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/wsrs/ hadoop-hdfs/ hadoop-hdfs/src/main/java/ had...
Date Mon, 10 Dec 2012 03:37:46 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Dec 10 03:37:32 2012
@@ -34,6 +34,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
@@ -111,6 +113,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
@@ -121,6 +124,7 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -165,12 +169,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
-import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
@@ -250,32 +252,32 @@ public class FSNamesystem implements Nam
       }
   };
 
-  private static final void logAuditEvent(UserGroupInformation ugi,
+  private boolean isAuditEnabled() {
+    return !isDefaultAuditLogger || auditLog.isInfoEnabled();
+  }
+
+  private void logAuditEvent(UserGroupInformation ugi,
       InetAddress addr, String cmd, String src, String dst,
       HdfsFileStatus stat) {
     logAuditEvent(true, ugi, addr, cmd, src, dst, stat);
   }
 
-  private static final void logAuditEvent(boolean succeeded,
+  private void logAuditEvent(boolean succeeded,
       UserGroupInformation ugi, InetAddress addr, String cmd, String src,
       String dst, HdfsFileStatus stat) {
-    final StringBuilder sb = auditBuffer.get();
-    sb.setLength(0);
-    sb.append("allowed=").append(succeeded).append("\t");
-    sb.append("ugi=").append(ugi).append("\t");
-    sb.append("ip=").append(addr).append("\t");
-    sb.append("cmd=").append(cmd).append("\t");
-    sb.append("src=").append(src).append("\t");
-    sb.append("dst=").append(dst).append("\t");
-    if (null == stat) {
-      sb.append("perm=null");
-    } else {
-      sb.append("perm=");
-      sb.append(stat.getOwner()).append(":");
-      sb.append(stat.getGroup()).append(":");
-      sb.append(stat.getPermission());
+    FileStatus status = null;
+    if (stat != null) {
+      Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
+      Path path = dst != null ? new Path(dst) : new Path(src);
+      status = new FileStatus(stat.getLen(), stat.isDir(),
+          stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
+          stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
+          stat.getGroup(), symlink, path);
+    }
+    for (AuditLogger logger : auditLoggers) {
+      logger.logAuditEvent(succeeded, ugi.toString(), addr,
+          cmd, src, dst, status);
     }
-    auditLog.info(sb);
   }
 
   /**
@@ -308,6 +310,11 @@ public class FSNamesystem implements Nam
   final DelegationTokenSecretManager dtSecretManager;
   private final boolean alwaysUseDelegationTokensForTests;
   
+  // Tracks whether the default audit logger is the only configured audit
+  // logger; this allows isAuditEnabled() to return false in case the
+  // underlying logger is disabled, and avoid some unnecessary work.
+  private final boolean isDefaultAuditLogger;
+  private final List<AuditLogger> auditLoggers;
 
   /** The namespace tree. */
   FSDirectory dir;
@@ -542,14 +549,50 @@ public class FSNamesystem implements Nam
       this.dir = new FSDirectory(fsImage, this, conf);
       this.snapshotManager = new SnapshotManager(this, dir);
       this.safeMode = new SafeModeInfo(conf);
-
+      this.auditLoggers = initAuditLoggers(conf);
+      this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
+        auditLoggers.get(0) instanceof DefaultAuditLogger;
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
       throw e;
+    } catch (RuntimeException re) {
+      LOG.error(getClass().getSimpleName() + " initialization failed.", re);
+      close();
+      throw re;
     }
   }
 
+  private List<AuditLogger> initAuditLoggers(Configuration conf) {
+    // Initialize the custom access loggers if configured.
+    Collection<String> alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
+    List<AuditLogger> auditLoggers = Lists.newArrayList();
+    if (alClasses != null && !alClasses.isEmpty()) {
+      for (String className : alClasses) {
+        try {
+          AuditLogger logger;
+          if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) {
+            logger = new DefaultAuditLogger();
+          } else {
+            logger = (AuditLogger) Class.forName(className).newInstance();
+          }
+          logger.initialize(conf);
+          auditLoggers.add(logger);
+        } catch (RuntimeException re) {
+          throw re;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    // Make sure there is at least one logger installed.
+    if (auditLoggers.isEmpty()) {
+      auditLoggers.add(new DefaultAuditLogger());
+    }
+    return auditLoggers;
+  }
+
   void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled)
       throws IOException {
     // format before starting up if requested
@@ -1009,8 +1052,8 @@ public class FSNamesystem implements Nam
       // start in active.
       return haEnabled;
     }
-  
-    return haContext.getState() instanceof StandbyState;
+
+    return HAServiceState.STANDBY == haContext.getState().getServiceState();
   }
 
   /**
@@ -1036,7 +1079,8 @@ public class FSNamesystem implements Nam
     long totalInodes = this.dir.totalInodes();
     long totalBlocks = this.getBlocksTotal();
     out.println(totalInodes + " files and directories, " + totalBlocks
-        + " blocks = " + (totalInodes + totalBlocks) + " total");
+        + " blocks = " + (totalInodes + totalBlocks)
+        + " total filesystem objects");
 
     blockManager.metaSave(out);
   }
@@ -1082,7 +1126,7 @@ public class FSNamesystem implements Nam
     try {
       setPermissionInt(src, permission);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "setPermission", src, null, null);
@@ -1104,14 +1148,14 @@ public class FSNamesystem implements Nam
       }
       checkOwner(src);
       dir.setPermission(src, permission);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(src, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "setPermission", src, null, resultingStat);
@@ -1128,7 +1172,7 @@ public class FSNamesystem implements Nam
     try {
       setOwnerInt(src, username, group);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "setOwner", src, null, null);
@@ -1159,14 +1203,14 @@ public class FSNamesystem implements Nam
         }
       }
       dir.setOwner(src, username, group);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(src, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "setOwner", src, null, resultingStat);
@@ -1209,7 +1253,7 @@ public class FSNamesystem implements Nam
       return getBlockLocationsInt(src, offset, length, doAccessTime,
                                   needBlockToken, checkSafeMode);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "open", src, null, null);
@@ -1235,7 +1279,7 @@ public class FSNamesystem implements Nam
     }
     final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
         offset, length, doAccessTime, needBlockToken);  
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "open", src, null, null);
@@ -1316,7 +1360,7 @@ public class FSNamesystem implements Nam
     try {
       concatInt(target, srcs);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getLoginUser(),
                       getRemoteIp(),
                       "concat", Arrays.toString(srcs), target, null);
@@ -1359,14 +1403,14 @@ public class FSNamesystem implements Nam
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
       concatInternal(target, srcs);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(target, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getLoginUser(),
                     getRemoteIp(),
                     "concat", Arrays.toString(srcs), target, resultingStat);
@@ -1492,7 +1536,7 @@ public class FSNamesystem implements Nam
     try {
       setTimesInt(src, mtime, atime);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "setTimes", src, null, null);
@@ -1518,7 +1562,7 @@ public class FSNamesystem implements Nam
       INode inode = dir.getMutableINode(src);
       if (inode != null) {
         dir.setTimes(src, inode, mtime, atime, true);
-        if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        if (isAuditEnabled() && isExternalInvocation()) {
           final HdfsFileStatus stat = dir.getFileInfo(src, false);
           logAuditEvent(UserGroupInformation.getCurrentUser(),
                         getRemoteIp(),
@@ -1541,7 +1585,7 @@ public class FSNamesystem implements Nam
     try {
       createSymlinkInt(target, link, dirPerms, createParent);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "createSymlink", link, target, null);
@@ -1562,14 +1606,14 @@ public class FSNamesystem implements Nam
         verifyParentDir(link);
       }
       createSymlinkInternal(target, link, dirPerms, createParent);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(link, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "createSymlink", link, target, resultingStat);
@@ -1625,7 +1669,7 @@ public class FSNamesystem implements Nam
     try {
       return setReplicationInt(src, replication);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "setReplication", src, null, null);
@@ -1661,7 +1705,7 @@ public class FSNamesystem implements Nam
     }
 
     getEditLog().logSync();
-    if (isFile && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isFile && isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "setReplication", src, null, null);
@@ -1717,7 +1761,7 @@ public class FSNamesystem implements Nam
       startFileInt(src, permissions, holder, clientMachine, flag, createParent,
                    replication, blockSize);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "create", src, null, null);
@@ -1750,7 +1794,7 @@ public class FSNamesystem implements Nam
       }
     } 
 
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
@@ -2052,7 +2096,7 @@ public class FSNamesystem implements Nam
     try {
       return appendFileInt(src, holder, clientMachine);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "append", src, null, null);
@@ -2098,7 +2142,7 @@ public class FSNamesystem implements Nam
             +" block size " + lb.getBlock().getNumBytes());
       }
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "append", src, null, null);
@@ -2544,7 +2588,7 @@ public class FSNamesystem implements Nam
     try {
       return renameToInt(src, dst);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "rename", src, dst, null);
@@ -2566,14 +2610,14 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
 
       status = renameToInternal(src, dst);
-      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (status && isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (status && isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "rename", src, dst, resultingStat);
@@ -2595,15 +2639,15 @@ public class FSNamesystem implements Nam
     if (isPermissionEnabled) {
       //We should not be doing this.  This is move() not renameTo().
       //but for now,
+      //NOTE: yes, this is bad!  it's assuming much lower level behavior
+      //      of rewriting the dst
       String actualdst = dir.isDir(dst)?
           dst + Path.SEPARATOR + new Path(src).getName(): dst;
       checkParentAccess(src, FsAction.WRITE);
       checkAncestorAccess(actualdst, FsAction.WRITE);
     }
 
-    HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     if (dir.renameTo(src, dst)) {
-      unprotectedChangeLease(src, dst, dinfo);     // update lease with new filename
       return true;
     }
     return false;
@@ -2623,14 +2667,14 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
 
       renameToInternal(src, dst, options);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false); 
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       StringBuilder cmd = new StringBuilder("rename options=");
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
@@ -2654,9 +2698,7 @@ public class FSNamesystem implements Nam
       checkAncestorAccess(dst, FsAction.WRITE);
     }
 
-    HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     dir.renameTo(src, dst, options);
-    unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
   }
   
   /**
@@ -2671,7 +2713,7 @@ public class FSNamesystem implements Nam
     try {
       return deleteInt(src, recursive);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "delete", src, null, null);
@@ -2687,7 +2729,7 @@ public class FSNamesystem implements Nam
       NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
     }
     boolean status = deleteInternal(src, recursive, true);
-    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (status && isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "delete", src, null, null);
@@ -2885,7 +2927,7 @@ public class FSNamesystem implements Nam
       }
       stat = dir.getFileInfo(src, resolveLink);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "getfileinfo", src, null, null);
@@ -2894,7 +2936,7 @@ public class FSNamesystem implements Nam
     } finally {
       readUnlock();
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "getfileinfo", src, null, null);
@@ -2910,7 +2952,7 @@ public class FSNamesystem implements Nam
     try {
       return mkdirsInt(src, permissions, createParent);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "mkdirs", src, null, null);
@@ -2934,7 +2976,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (status && isAuditEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
@@ -3368,7 +3410,7 @@ public class FSNamesystem implements Nam
     try {
       return getListingInt(src, startAfter, needLocation);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "listStatus", src, null, null);
@@ -3392,7 +3434,7 @@ public class FSNamesystem implements Nam
           checkTraverse(src);
         }
       }
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "listStatus", src, null, null);
@@ -3482,15 +3524,7 @@ public class FSNamesystem implements Nam
 
   private NNHAStatusHeartbeat createHaStatusHeartbeat() {
     HAState state = haContext.getState();
-    NNHAStatusHeartbeat.State hbState;
-    if (state instanceof ActiveState) {
-      hbState = NNHAStatusHeartbeat.State.ACTIVE;
-    } else if (state instanceof StandbyState) {
-      hbState = NNHAStatusHeartbeat.State.STANDBY;      
-    } else {
-      throw new AssertionError("Invalid state: " + state.getClass());
-    }
-    return new NNHAStatusHeartbeat(hbState,
+    return new NNHAStatusHeartbeat(state.getServiceState(),
         getFSImage().getLastAppliedOrWrittenTxId());
   }
 
@@ -3929,7 +3963,7 @@ public class FSNamesystem implements Nam
     private synchronized void leave() {
       // if not done yet, initialize replication queues.
       // In the standby, do not populate repl queues
-      if (!isPopulatingReplQueues() && !isInStandbyState()) {
+      if (!isPopulatingReplQueues() && shouldPopulateReplQueues()) {
         initializeReplQueues();
       }
       long timeInSafemode = now() - startTime;
@@ -3972,7 +4006,8 @@ public class FSNamesystem implements Nam
      * initializing replication queues.
      */
     private synchronized boolean canInitializeReplQueues() {
-      return !isInStandbyState() && blockSafe >= blockReplQueueThreshold;
+      return shouldPopulateReplQueues()
+          && blockSafe >= blockReplQueueThreshold;
     }
       
     /** 
@@ -4312,7 +4347,7 @@ public class FSNamesystem implements Nam
 
   @Override
   public boolean isPopulatingReplQueues() {
-    if (isInStandbyState()) {
+    if (!shouldPopulateReplQueues()) {
       return false;
     }
     // safeMode is volatile, and may be set to null at any time
@@ -4321,7 +4356,13 @@ public class FSNamesystem implements Nam
       return true;
     return safeMode.isPopulatingReplQueues();
   }
-    
+
+  private boolean shouldPopulateReplQueues() {
+    if(haContext == null || haContext.getState() == null)
+      return false;
+    return haContext.getState().shouldPopulateReplQueues();
+  }
+
   @Override
   public void incrementSafeBlockCount(int replication) {
     // safeMode is volatile, and may be set to null at any time
@@ -4939,31 +4980,9 @@ public class FSNamesystem implements Nam
 
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
-  void unprotectedChangeLease(String src, String dst, HdfsFileStatus dinfo) {
-    String overwrite;
-    String replaceBy;
+  void unprotectedChangeLease(String src, String dst) {
     assert hasWriteLock();
-
-    boolean destinationExisted = true;
-    if (dinfo == null) {
-      destinationExisted = false;
-    }
-
-    if (destinationExisted && dinfo.isDir()) {
-      Path spath = new Path(src);
-      Path parent = spath.getParent();
-      if (parent.isRoot()) {
-        overwrite = parent.toString();
-      } else {
-        overwrite = parent.toString() + Path.SEPARATOR;
-      }
-      replaceBy = dst + Path.SEPARATOR;
-    } else {
-      overwrite = src;
-      replaceBy = dst;
-    }
-
-    leaseManager.changeLease(src, dst, overwrite, replaceBy);
+    leaseManager.changeLease(src, dst);
   }
 
   /**
@@ -4974,19 +4993,13 @@ public class FSNamesystem implements Nam
     // lock on our behalf. If we took the read lock here, we could block
     // for fairness if a writer is waiting on the lock.
     synchronized (leaseManager) {
-      out.writeInt(leaseManager.countPath()); // write the size
-
-      for (Lease lease : leaseManager.getSortedLeases()) {
-        for(String path : lease.getPaths()) {
-          // verify that path exists in namespace
-          final INodeFileUnderConstruction cons;
-          try {
-            cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
-          } catch (UnresolvedLinkException e) {
-            throw new AssertionError("Lease files should reside on this FS");
-          }
-          FSImageSerialization.writeINodeUnderConstruction(out, cons, path);
-        }
+      Map<String, INodeFileUnderConstruction> nodes =
+          leaseManager.getINodesUnderConstruction();
+      out.writeInt(nodes.size()); // write the size    
+      for (Map.Entry<String, INodeFileUnderConstruction> entry
+           : nodes.entrySet()) {
+        FSImageSerialization.writeINodeUnderConstruction(
+            out, entry.getValue(), entry.getKey());
       }
     }
   }
@@ -5345,7 +5358,7 @@ public class FSNamesystem implements Nam
    * Log fsck event in the audit log 
    */
   void logFsckEvent(String src, InetAddress remoteAddress) throws IOException {
-    if (auditLog.isInfoEnabled()) {
+    if (isAuditEnabled()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     remoteAddress,
                     "fsck", src, null, null);
@@ -5729,4 +5742,44 @@ public class FSNamesystem implements Nam
           newSnapshotRoot.toString(), null);
     }
   }
+
+  /**
+   * Default AuditLogger implementation; used when no access logger is
+   * defined in the config file. It can also be explicitly listed in the
+   * config file.
+   */
+  private static class DefaultAuditLogger implements AuditLogger {
+
+    @Override
+    public void initialize(Configuration conf) {
+      // Nothing to do.
+    }
+
+    @Override
+    public void logAuditEvent(boolean succeeded, String userName,
+        InetAddress addr, String cmd, String src, String dst,
+        FileStatus status) {
+      if (auditLog.isInfoEnabled()) {
+        final StringBuilder sb = auditBuffer.get();
+        sb.setLength(0);
+        sb.append("allowed=").append(succeeded).append("\t");
+        sb.append("ugi=").append(userName).append("\t");
+        sb.append("ip=").append(addr).append("\t");
+        sb.append("cmd=").append(cmd).append("\t");
+        sb.append("src=").append(src).append("\t");
+        sb.append("dst=").append(dst).append("\t");
+        if (null == status) {
+          sb.append("perm=null");
+        } else {
+          sb.append("perm=");
+          sb.append(status.getOwner()).append(":");
+          sb.append(status.getGroup()).append(":");
+          sb.append(status.getPermission());
+        }
+        auditLog.info(sb);
+      }
+    }
+
+  }
+
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Dec 10 03:37:32 2012
@@ -80,6 +80,11 @@ public class INodeDirectory extends INod
   public INodeDirectory(INodeDirectory other) {
     super(other);
     this.children = other.children;
+    if (this.children != null) {
+      for (INode child : children) {
+        child.parent = this;
+      }
+    }
   }
   
   /** @return true unconditionally. */
@@ -118,6 +123,7 @@ public class INodeDirectory extends INod
 
     final int low = searchChildren(newChild);
     if (low>=0) { // an old child exists so replace by the newChild
+      children.get(low).parent = null;
       children.set(low, newChild);
     } else {
       throw new IllegalArgumentException("No child exists to be replaced");

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Mon Dec 10 03:37:32 2012
@@ -331,22 +331,19 @@ public class LeaseManager {
     }
   }
 
-  synchronized void changeLease(String src, String dst,
-      String overwrite, String replaceBy) {
+  synchronized void changeLease(String src, String dst) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".changelease: " +
-               " src=" + src + ", dest=" + dst + 
-               ", overwrite=" + overwrite +
-               ", replaceBy=" + replaceBy);
+               " src=" + src + ", dest=" + dst);
     }
 
-    final int len = overwrite.length();
+    final int len = src.length();
     for(Map.Entry<String, Lease> entry
         : findLeaseWithPrefixPath(src, sortedLeasesByPath).entrySet()) {
       final String oldpath = entry.getKey();
       final Lease lease = entry.getValue();
-      //overwrite must be a prefix of oldpath
-      final String newpath = replaceBy + oldpath.substring(len);
+      // replace stem of src with new destination
+      final String newpath = dst + oldpath.substring(len);
       if (LOG.isDebugEnabled()) {
         LOG.debug("changeLease: replacing " + oldpath + " with " + newpath);
       }
@@ -429,6 +426,26 @@ public class LeaseManager {
     }
   }
 
+  /**
+   * Get the list of inodes corresponding to valid leases.
+   * @return list of inodes
+   * @throws UnresolvedLinkException
+   */
+  Map<String, INodeFileUnderConstruction> getINodesUnderConstruction() {
+    Map<String, INodeFileUnderConstruction> inodes =
+        new TreeMap<String, INodeFileUnderConstruction>();
+    for (String p : sortedLeasesByPath.keySet()) {
+      // verify that path exists in namespace
+      try {
+        INode node = fsnamesystem.dir.getINode(p);
+        inodes.put(p, INodeFileUnderConstruction.valueOf(node, p));
+      } catch (IOException ioe) {
+        LOG.error(ioe);
+      }
+    }
+    return inodes;
+  }
+  
   /** Check the leases beginning from the oldest.
    *  @return true is sync is needed.
    */

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Dec 10 03:37:32 2012
@@ -598,11 +598,7 @@ public class NameNode {
     String nsId = getNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
-    if (!haEnabled) {
-      state = ACTIVE_STATE;
-    } else {
-      state = STANDBY_STATE;
-    }
+    state = createHAState();
     this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
     this.haContext = createHAContext();
     try {
@@ -619,6 +615,10 @@ public class NameNode {
     }
   }
 
+  protected HAState createHAState() {
+    return !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
+  }
+
   protected HAContext createHAContext() {
     return new NameNodeHAContext();
   }
@@ -1050,6 +1050,9 @@ public class NameNode {
 
   private static void doRecovery(StartupOption startOpt, Configuration conf)
       throws IOException {
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
+    initializeGenericKeys(conf, nsId, namenodeId);
     if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
       if (!confirmPrompt("You have selected Metadata Recovery mode.  " +
           "This mode is intended to recover lost metadata on a corrupt " +
@@ -1298,7 +1301,7 @@ public class NameNode {
    *          before exit.
    * @throws ExitException thrown only for testing.
    */
-  private synchronized void doImmediateShutdown(Throwable t)
+  protected synchronized void doImmediateShutdown(Throwable t)
       throws ExitException {
     String message = "Error encountered requiring NN shutdown. " +
         "Shutting down immediately.";

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Mon Dec 10 03:37:32 2012
@@ -102,7 +102,7 @@ class NamenodeJspHelper {
     long usedNonHeap = (totalNonHeap * 100) / commitedNonHeap;
 
     String str = "<div>" + inodes + " files and directories, " + blocks + " blocks = "
-        + (inodes + blocks) + " total";
+        + (inodes + blocks) + " total filesystem objects";
     if (maxobjects != 0) {
       long pct = ((inodes + blocks) * 100) / maxobjects;
       str += " / " + maxobjects + " (" + pct + "%)";

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java Mon Dec 10 03:37:32 2012
@@ -19,31 +19,26 @@ package org.apache.hadoop.hdfs.server.pr
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class NNHAStatusHeartbeat {
 
-  private State state;
+  private HAServiceState state;
   private long txid = HdfsConstants.INVALID_TXID;
   
-  public NNHAStatusHeartbeat(State state, long txid) {
+  public NNHAStatusHeartbeat(HAServiceState state, long txid) {
     this.state = state;
     this.txid = txid;
   }
 
-  public State getState() {
+  public HAServiceState getState() {
     return state;
   }
   
   public long getTxId() {
     return txid;
   }
-  
-  @InterfaceAudience.Private
-  public enum State {
-    ACTIVE,
-    STANDBY;
-  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UserParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UserParam.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UserParam.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UserParam.java Mon Dec 10 03:37:32 2012
@@ -38,10 +38,9 @@ public class UserParam extends StringPar
         MessageFormat.format("Parameter [{0}], cannot be NULL", NAME));
     }
     int len = str.length();
-    if (len < 1 || len > 31) {
+    if (len < 1) {
       throw new IllegalArgumentException(MessageFormat.format(
-        "Parameter [{0}], invalid value [{1}], it's length must be between 1 and 31",
-        NAME, str));
+        "Parameter [{0}], it's length must be at least 1", NAME));
     }
     return str;
   }

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1415804-1419190

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Mon Dec 10 03:37:32 2012
@@ -1184,4 +1184,17 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.audit.loggers</name>
+  <value>default</value>
+  <description>
+    List of classes implementing audit loggers that will receive audit events.
+    These should be implementations of org.apache.hadoop.hdfs.server.namenode.AuditLogger.
+    The special value "default" can be used to reference the default audit
+    logger, which uses the configured log system. Installing custom audit loggers
+    may affect the performance and stability of the NameNode. Refer to the custom
+    logger's documentation for more details.
+  </description>
+</property>
+
 </configuration>

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1415804-1419190

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1415804-1419190

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1415804-1419190

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1415804-1419190

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsCreateMkdir.java Mon Dec 10 03:37:32 2012
@@ -41,6 +41,7 @@ public class TestFcHdfsCreateMkdir exten
   @BeforeClass
   public static void clusterSetupAtBegining()
                                     throws IOException, LoginException, URISyntaxException  {
+    FileContextTestHelper.TEST_ROOT_DIR = "/tmp/TestFcHdfsCreateMkdir";
     Configuration conf = new HdfsConfiguration();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     fc = FileContext.getFileContext(cluster.getURI(0), conf);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsPermission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsPermission.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsPermission.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsPermission.java Mon Dec 10 03:37:32 2012
@@ -41,6 +41,7 @@ public class TestFcHdfsPermission extend
   @BeforeClass
   public static void clusterSetupAtBegining()
                                     throws IOException, LoginException, URISyntaxException  {
+    FileContextTestHelper.TEST_ROOT_DIR = "/tmp/TestFcHdfsPermission";
     Configuration conf = new HdfsConfiguration();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     fc = FileContext.getFileContext(cluster.getURI(0), conf);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsSetUMask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsSetUMask.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsSetUMask.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsSetUMask.java Mon Dec 10 03:37:32 2012
@@ -82,6 +82,7 @@ public class TestFcHdfsSetUMask {
   @BeforeClass
   public static void clusterSetupAtBegining()
         throws IOException, LoginException, URISyntaxException  {
+    FileContextTestHelper.TEST_ROOT_DIR = "/tmp/TestFcHdfsSetUMask";
     Configuration conf = new HdfsConfiguration();
     // set permissions very restrictive
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "077");

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsSymlink.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsSymlink.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestFcHdfsSymlink.java Mon Dec 10 03:37:32 2012
@@ -86,6 +86,7 @@ public class TestFcHdfsSymlink extends F
 
   @BeforeClass
   public static void testSetUp() throws Exception {
+    FileContextTestHelper.TEST_ROOT_DIR = "/tmp/TestFcHdfsSymlink";
     Configuration conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
     conf.set(FsPermission.UMASK_LABEL, "000");

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java Mon Dec 10 03:37:32 2012
@@ -49,6 +49,8 @@ public class TestHDFSFileContextMainOper
   @BeforeClass
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
+    FileContextTestHelper.TEST_ROOT_DIR =
+      "/tmp/TestHDFSFileContextMainOperations";
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
     cluster.waitClusterUp();
     fc = FileContext.getFileContext(cluster.getURI(0), CONF);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemAtHdfsRoot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemAtHdfsRoot.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemAtHdfsRoot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemAtHdfsRoot.java Mon Dec 10 03:37:32 2012
@@ -25,6 +25,7 @@ import javax.security.auth.login.LoginEx
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -45,6 +46,7 @@ public class TestViewFileSystemAtHdfsRoo
   @BeforeClass
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
+    FileSystemTestHelper.TEST_ROOT_DIR = "/tmp/TestViewFileSystemAtHdfsRoot";
     SupportsBlocks = true;
     CONF.setBoolean(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java Mon Dec 10 03:37:32 2012
@@ -51,6 +51,7 @@ public class TestViewFileSystemHdfs exte
   @BeforeClass
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
+    FileSystemTestHelper.TEST_ROOT_DIR = "/tmp/TestViewFileSystemHdfs";
     SupportsBlocks = true;
     CONF.setBoolean(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsAtHdfsRoot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsAtHdfsRoot.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsAtHdfsRoot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsAtHdfsRoot.java Mon Dec 10 03:37:32 2012
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileContextTestHelper;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -46,6 +47,7 @@ public class TestViewFsAtHdfsRoot extend
   @BeforeClass
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
+    FileContextTestHelper.TEST_ROOT_DIR = "/tmp/TestViewFsAtHdfsRoot";
     SupportsBlocks = true;
     CONF.setBoolean(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsHdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsHdfs.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsHdfs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsHdfs.java Mon Dec 10 03:37:32 2012
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileContextTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -42,6 +43,7 @@ public class TestViewFsHdfs extends View
   @BeforeClass
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
+    FileContextTestHelper.TEST_ROOT_DIR = "/tmp/TestViewFsHdfs";
     SupportsBlocks = true;
     CONF.setBoolean(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java Mon Dec 10 03:37:32 2012
@@ -30,7 +30,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@@ -49,6 +51,10 @@ public class TestLease {
         ).getLeaseByPath(src.toString()) != null;
   }
 
+  static int leaseCount(MiniDFSCluster cluster) {
+    return NameNodeAdapter.getLeaseManager(cluster.getNamesystem()).countLease();
+  }
+  
   static final String dirString = "/test/lease";
   final Path dir = new Path(dirString);
   static final Log LOG = LogFactory.getLog(TestLease.class);
@@ -127,6 +133,96 @@ public class TestLease {
   }
 
   @Test
+  public void testLeaseAfterRename() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    try {
+      Path p = new Path("/test-file");
+      Path d = new Path("/test-d");
+      Path d2 = new Path("/test-d-other");
+
+      // open a file to get a lease
+      FileSystem fs = cluster.getFileSystem();
+      FSDataOutputStream out = fs.create(p);
+      out.writeBytes("something");
+      //out.hsync();
+      Assert.assertTrue(hasLease(cluster, p));
+      Assert.assertEquals(1, leaseCount(cluster));
+      
+      // just to ensure first fs doesn't have any logic to twiddle leases
+      DistributedFileSystem fs2 = (DistributedFileSystem) FileSystem.newInstance(fs.getUri(), fs.getConf());
+
+      // rename the file into an existing dir
+      LOG.info("DMS: rename file into dir");
+      Path pRenamed = new Path(d, p.getName());
+      fs2.mkdirs(d);
+      fs2.rename(p, pRenamed);
+      Assert.assertFalse(p+" exists", fs2.exists(p));
+      Assert.assertTrue(pRenamed+" not found", fs2.exists(pRenamed));
+      Assert.assertFalse("has lease for "+p, hasLease(cluster, p));
+      Assert.assertTrue("no lease for "+pRenamed, hasLease(cluster, pRenamed));
+      Assert.assertEquals(1, leaseCount(cluster));
+    
+      // rename the parent dir to a new non-existent dir
+      LOG.info("DMS: rename parent dir");
+      Path pRenamedAgain = new Path(d2, pRenamed.getName());
+      fs2.rename(d, d2);
+      // src gone
+      Assert.assertFalse(d+" exists", fs2.exists(d));
+      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
+      // dst checks
+      Assert.assertTrue(d2+" not found", fs2.exists(d2));
+      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
+      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
+      Assert.assertEquals(1, leaseCount(cluster));
+
+      // rename the parent dir to existing dir
+      // NOTE: rename w/o options moves paths into existing dir
+      LOG.info("DMS: rename parent again");
+      pRenamed = pRenamedAgain;
+      pRenamedAgain = new Path(new Path(d, d2.getName()), p.getName());      
+      fs2.mkdirs(d);
+      fs2.rename(d2, d);
+      // src gone
+      Assert.assertFalse(d2+" exists", fs2.exists(d2));
+      Assert.assertFalse("no lease for "+pRenamed, hasLease(cluster, pRenamed));
+      // dst checks
+      Assert.assertTrue(d+" not found", fs2.exists(d));
+      Assert.assertTrue(pRenamedAgain +" not found", fs2.exists(pRenamedAgain));
+      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
+      Assert.assertEquals(1, leaseCount(cluster));
+      
+      // rename with opts to non-existent dir
+      pRenamed = pRenamedAgain;
+      pRenamedAgain = new Path(d2, p.getName());
+      fs2.rename(pRenamed.getParent(), d2, Options.Rename.OVERWRITE);
+      // src gone
+      Assert.assertFalse(pRenamed.getParent() +" not found", fs2.exists(pRenamed.getParent()));
+      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
+      // dst checks
+      Assert.assertTrue(d2+" not found", fs2.exists(d2));
+      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
+      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
+      Assert.assertEquals(1, leaseCount(cluster));
+
+      // rename with opts to existing dir
+      // NOTE: rename with options will not move paths into the existing dir
+      pRenamed = pRenamedAgain;
+      pRenamedAgain = new Path(d, p.getName());
+      fs2.rename(pRenamed.getParent(), d, Options.Rename.OVERWRITE);
+      // src gone
+      Assert.assertFalse(pRenamed.getParent() +" not found", fs2.exists(pRenamed.getParent()));
+      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
+      // dst checks
+      Assert.assertTrue(d+" not found", fs2.exists(d));
+      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
+      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
+      Assert.assertEquals(1, leaseCount(cluster));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test
   public void testLease() throws Exception {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     try {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Mon Dec 10 03:37:32 2012
@@ -17,13 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
-import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,7 +45,7 @@ import org.junit.Test;
 /**
  * This class tests if a balancer schedules tasks correctly.
  */
-public class TestBalancerWithNodeGroup extends TestCase {
+public class TestBalancerWithNodeGroup {
   private static final Log LOG = LogFactory.getLog(
   "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java Mon Dec 10 03:37:32 2012
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,6 +46,8 @@ import org.junit.Test;
 public class TestReplicationPolicyWithNodeGroup {
   private static final int BLOCK_SIZE = 1024;
   private static final int NUM_OF_DATANODES = 8;
+  private static final int NUM_OF_DATANODES_BOUNDARY = 6;
+  private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
   private static final Configuration CONF = new HdfsConfiguration();
   private static final NetworkTopology cluster;
   private static final NameNode namenode;
@@ -61,6 +64,32 @@ public class TestReplicationPolicyWithNo
       DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
       DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
   };
+  
+  private final static DatanodeDescriptor dataNodesInBoundaryCase[] = 
+          new DatanodeDescriptor[] {
+      DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"),
+      DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
+      DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3")
+  };
+  
+  private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] =
+          new DatanodeDescriptor[] {
+      DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"),
+      DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"),
+      DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"),
+      DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"),
+      DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"),
+      DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"),
+      DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"),
+      DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"),
+      DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"),
+      DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"),
+  };
 
   private final static DatanodeDescriptor NODE = 
       new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
@@ -74,6 +103,12 @@ public class TestReplicationPolicyWithNo
           "org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
       CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 
           "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+      
+      File baseDir = new File(System.getProperty(
+          "test.build.data", "build/test/data"), "dfs/");
+      CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+          new File(baseDir, "name").getPath());
+      
       DFSTestUtil.formatNameNode(CONF);
       namenode = new NameNode(CONF);
     } catch (IOException e) {
@@ -97,7 +132,27 @@ public class TestReplicationPolicyWithNo
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
     }
   }
-
+  
+  /**
+   * Scan the targets list: all targets should be on different NodeGroups.
+   * Return false if two targets are found on the same NodeGroup.
+   */
+  private static boolean checkTargetsOnDifferentNodeGroup(
+      DatanodeDescriptor[] targets) {
+    if(targets.length == 0)
+      return true;
+    Set<String> targetSet = new HashSet<String>();
+    for(DatanodeDescriptor node:targets) {
+      String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
+      if(targetSet.contains(nodeGroup)) {
+        return false;
+      } else {
+        targetSet.add(nodeGroup);
+      }
+    }
+    return true;
+  }
+  
   /**
    * In this testcase, client is dataNodes[0]. So the 1st replica should be
    * placed on dataNodes[0], the 2nd replica should be placed on 
@@ -497,5 +552,122 @@ public class TestReplicationPolicyWithNo
         null, null, (short)1, first, second);
     assertEquals(chosenNode, dataNodes[5]);
   }
+  
+  /**
+   * Test replica placement policy in case of boundary topology.
+   * Rack 2 has only 1 node group & can't be placed with two replicas
+   * The 1st replica will be placed on writer.
+   * The 2nd replica should be placed on a different rack 
+   * The 3rd replica should be placed on the same rack with writer, but on a 
+   * different node group.
+   */
+  @Test
+  public void testChooseTargetsOnBoundaryTopology() throws Exception {
+    for(int i=0; i<NUM_OF_DATANODES; i++) {
+      cluster.remove(dataNodes[i]);
+    }
+
+    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+      cluster.add(dataNodesInBoundaryCase[i]);
+    }
+    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+        dataNodes[0].updateHeartbeat(
+                2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+                (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+        
+      dataNodesInBoundaryCase[i].updateHeartbeat(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+    }
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+
+    targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+  }
+
+  /**
+   * Test re-replication policy in boundary case.
+   * Rack 2 has only one node group & the node in this node group is chosen
+   * Rack 1 has two nodegroups & one of them is chosen.
+   * Replica policy should choose the node from node group of Rack1 but not the
+   * same nodegroup with chosen nodes.
+   */
+  @Test
+  public void testRereplicateOnBoundaryTopology() throws Exception {
+    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+      dataNodesInBoundaryCase[i].updateHeartbeat(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+    }
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(dataNodesInBoundaryCase[0]);
+    chosenNodes.add(dataNodesInBoundaryCase[5]);
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
+        chosenNodes, BLOCK_SIZE);
+    assertFalse(cluster.isOnSameNodeGroup(targets[0], 
+        dataNodesInBoundaryCase[0]));
+    assertFalse(cluster.isOnSameNodeGroup(targets[0],
+        dataNodesInBoundaryCase[5]));
+    assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+  }
+  
+  /**
+   * Test replica placement policy in case of targets more than number of 
+   * NodeGroups.
+   * The 12-nodes cluster only has 6 NodeGroups, but in some cases, like: 
+   * placing submitted job file, there is requirement to choose more (10) 
+   * targets for placing replica. We should test it can return 6 targets.
+   */
+  @Test
+  public void testChooseMoreTargetsThanNodeGroups() throws Exception {
+    // Cleanup nodes in previous tests
+    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+      DatanodeDescriptor node = dataNodesInBoundaryCase[i];
+      if (cluster.contains(node)) {
+        cluster.remove(node);
+      }
+    }
+
+    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+      cluster.add(dataNodesInMoreTargetsCase[i]);
+    }
+
+    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+      dataNodesInMoreTargetsCase[i].updateHeartbeat(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+    }
+
+    DatanodeDescriptor[] targets;
+    // Test normal case -- 3 replicas
+    targets = replicator.chooseTarget(filename, 3, dataNodesInMoreTargetsCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+
+    // Test special case -- replica number over node groups.
+    targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+    // Verify it only can find 6 targets for placing replicas.
+    assertEquals(targets.length, 6);
+  }
+
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java Mon Dec 10 03:37:32 2012
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
-import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -123,7 +123,7 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.anyInt(),
           Mockito.anyInt());
-    mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(State.STANDBY, 0);
+    mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     return mock;
   }
   
@@ -255,12 +255,12 @@ public class TestBPOfferService {
       assertNull(bpos.getActiveNN());
 
       // Have NN1 claim active at txid 1
-      mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 1);
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
       bpos.triggerHeartbeatForTests();
       assertSame(mockNN1, bpos.getActiveNN());
 
       // NN2 claims active at a higher txid
-      mockHaStatuses[1] = new NNHAStatusHeartbeat(State.ACTIVE, 2);
+      mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 2);
       bpos.triggerHeartbeatForTests();
       assertSame(mockNN2, bpos.getActiveNN());
       
@@ -272,12 +272,12 @@ public class TestBPOfferService {
       // Even if NN2 goes to standby, DN shouldn't reset to talking to NN1,
       // because NN1's txid is lower than the last active txid. Instead,
       // it should consider neither active.
-      mockHaStatuses[1] = new NNHAStatusHeartbeat(State.STANDBY, 2);
+      mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 2);
       bpos.triggerHeartbeatForTests();
       assertNull(bpos.getActiveNN());
       
       // Now if NN1 goes back to a higher txid, it should be considered active
-      mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 3);
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 3);
       bpos.triggerHeartbeatForTests();
       assertSame(mockNN1, bpos.getActiveNN());
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Mon Dec 10 03:37:32 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -72,7 +73,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
-import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -157,7 +157,7 @@ public class TestBlockRecovery {
             Mockito.anyInt()))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
-            new NNHAStatusHeartbeat(State.ACTIVE, 1)));
+            new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1)));
 
     dn = new DataNode(conf, dirs, null) {
       @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Mon Dec 10 03:37:32 2012
@@ -179,6 +179,13 @@ public class NameNodeAdapter {
     return spy;
   }
   
+  public static JournalSet spyOnJournalSet(NameNode nn) {
+    FSEditLog editLog = nn.getFSImage().getEditLog();
+    JournalSet js = Mockito.spy(editLog.getJournalSet());
+    editLog.setJournalSetForTesting(js);
+    return js;
+  }
+  
   public static String getMkdirOpPath(FSEditLogOp op) {
     if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
       return ((MkdirOp) op).path;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java Mon Dec 10 03:37:32 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -103,6 +104,9 @@ public class TestBackupNode {
     BackupNode bn = (BackupNode)NameNode.createNameNode(
         new String[]{startupOpt.getName()}, c);
     assertTrue(bn.getRole() + " must be in SafeMode.", bn.isInSafeMode());
+    assertTrue(bn.getRole() + " must be in StandbyState",
+               bn.getNamesystem().getHAState()
+                 .equalsIgnoreCase(HAServiceState.STANDBY.name()));
     return bn;
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java Mon Dec 10 03:37:32 2012
@@ -25,10 +25,15 @@ import static org.junit.Assert.fail;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.junit.Test;
 
@@ -157,6 +162,48 @@ public class TestINodeFile {
     
   }
   
+  /**
+   * FSDirectory#unprotectedSetQuota creates a new INodeDirectoryWithQuota to
+   * replace the original INodeDirectory. Before HDFS-4243, the parent field of
+   * all the children INodes of the target INodeDirectory is not changed to
+   * point to the new INodeDirectoryWithQuota. This testcase tests this
+   * scenario.
+   */
+  @Test
+  public void testGetFullPathNameAfterSetQuota() throws Exception {
+    long fileLen = 1024;
+    replication = 3;
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        replication).build();
+    cluster.waitActive();
+    FSNamesystem fsn = cluster.getNamesystem();
+    FSDirectory fsdir = fsn.getFSDirectory();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    
+    // Create a file for test
+    final Path dir = new Path("/dir");
+    final Path file = new Path(dir, "file");
+    DFSTestUtil.createFile(dfs, file, fileLen, replication, 0L);
+    
+    // Check the full path name of the INode associating with the file
+    INode fnode = fsdir.getINode(file.toString());
+    assertEquals(file.toString(), fnode.getFullPathName());
+    
+    // Call FSDirectory#unprotectedSetQuota which calls
+    // INodeDirectory#replaceChild
+    dfs.setQuota(dir, Long.MAX_VALUE - 1, replication * fileLen * 10);
+    final Path newDir = new Path("/newdir");
+    final Path newFile = new Path(newDir, "file");
+    // Also rename dir
+    dfs.rename(dir, newDir, Options.Rename.OVERWRITE);
+    // /dir/file now should be renamed to /newdir/file
+    fnode = fsdir.getINode(newFile.toString());
+    // getFullPathName can return correct result only if the parent field of
+    // child node is set correctly
+    assertEquals(newFile.toString(), fnode.getFullPathName());
+  }
+  
   @Test
   public void testAppendBlocks() {
     INodeFile origFile = createINodeFiles(1, "origfile")[0];

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java Mon Dec 10 03:37:32 2012
@@ -92,7 +92,8 @@ public class TestMetaSave {
     DataInputStream in = new DataInputStream(fstream);
     BufferedReader reader = new BufferedReader(new InputStreamReader(in));
     String line = reader.readLine();
-    assertTrue(line.equals("3 files and directories, 2 blocks = 5 total"));
+    assertTrue(line.equals(
+      "3 files and directories, 2 blocks = 5 total filesystem objects"));
     line = reader.readLine();
     assertTrue(line.equals("Live Datanodes: 1"));
     line = reader.readLine();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java Mon Dec 10 03:37:32 2012
@@ -30,11 +30,16 @@ import java.io.RandomAccessFile;
 import java.util.HashSet;
 import java.util.Set;
 
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -438,6 +443,39 @@ public class TestNameNodeRecovery {
     }
   }
 
+  /**
+   * Create a test configuration that will exercise the initializeGenericKeys
+   * code path.  This is a regression test for HDFS-4279.
+   */
+  static void setupRecoveryTestConf(Configuration conf) throws IOException {
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1");
+    conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
+    conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
+      "ns1"), "nn1,nn2");
+    String baseDir = System.getProperty(
+        MiniDFSCluster.PROP_TEST_BUILD_DATA, "build/test/data") + "/dfs/";
+    File nameDir = new File(baseDir, "nameR");
+    File secondaryDir = new File(baseDir, "namesecondaryR");
+    conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.
+        DFS_NAMENODE_NAME_DIR_KEY, "ns1", "nn1"),
+        nameDir.getCanonicalPath());
+    conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.
+        DFS_NAMENODE_CHECKPOINT_DIR_KEY, "ns1", "nn1"),
+        secondaryDir.getCanonicalPath());
+    conf.unset(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
+    conf.unset(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
+    FileUtils.deleteQuietly(nameDir);
+    if (!nameDir.mkdirs()) {
+      throw new RuntimeException("failed to make directory " +
+        nameDir.getAbsolutePath());
+    }
+    FileUtils.deleteQuietly(secondaryDir);
+    if (!secondaryDir.mkdirs()) {
+      throw new RuntimeException("failed to make directory " +
+        secondaryDir.getAbsolutePath());
+    }
+  }
+
   static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize)
       throws IOException {
     final String TEST_PATH = "/test/path/dir";
@@ -446,12 +484,13 @@ public class TestNameNodeRecovery {
 
     // start a cluster
     Configuration conf = new HdfsConfiguration();
+    setupRecoveryTestConf(conf);
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
     StorageDirectory sd = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
-          .enableManagedDfsDirsRedundancy(false).build();
+          .manageNameDfsDirs(false).build();
       cluster.waitActive();
       if (!finalize) {
         // Normally, the in-progress edit log would be finalized by



Mime
View raw message