hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1555021 [14/15] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apach...
Date Fri, 03 Jan 2014 07:27:01 GMT
Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Fri Jan  3 07:26:52 2014
@@ -21,8 +21,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.protocol.CachePoolInfo.RELATIVE_EXPIRY_NEVER;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -34,6 +35,7 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -43,6 +45,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
@@ -61,10 +64,13 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.CachePoolStats;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
@@ -92,25 +98,50 @@ public class TestCacheDirectives {
   static private MiniDFSCluster cluster;
   static private DistributedFileSystem dfs;
   static private NamenodeProtocols proto;
+  static private NameNode namenode;
   static private CacheManipulator prevCacheManipulator;
 
   static {
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
     EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
   }
 
-  @Before
-  public void setup() throws Exception {
-    conf = new HdfsConfiguration();
+  private static final long BLOCK_SIZE = 4096;
+  private static final int NUM_DATANODES = 4;
+  // Most Linux installs will allow non-root users to lock 64KB.
+  // In this test though, we stub out mlock so this doesn't matter.
+  private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
+
+  private static HdfsConfiguration createCachingConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
     // set low limits here for testing purposes
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
+        2);
+
+    return conf;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = createCachingConf();
+    cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
     cluster.waitActive();
     dfs = cluster.getFileSystem();
     proto = cluster.getNameNodeRpc();
+    namenode = cluster.getNameNode();
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
     NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
-    LogManager.getLogger(CacheReplicationMonitor.class).setLevel(Level.TRACE);
+    LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(CacheManager.class.getName()).setLevel(
+        Level.TRACE);
   }
 
   @After
@@ -127,7 +158,7 @@ public class TestCacheDirectives {
     final String poolName = "pool1";
     CachePoolInfo info = new CachePoolInfo(poolName).
         setOwnerName("bob").setGroupName("bobgroup").
-        setMode(new FsPermission((short)0755)).setWeight(150);
+        setMode(new FsPermission((short)0755)).setLimit(150l);
 
     // Add a pool
     dfs.addCachePool(info);
@@ -168,7 +199,7 @@ public class TestCacheDirectives {
 
     // Modify the pool
     info.setOwnerName("jane").setGroupName("janegroup")
-        .setMode(new FsPermission((short)0700)).setWeight(314);
+        .setMode(new FsPermission((short)0700)).setLimit(314l);
     dfs.modifyCachePool(info);
 
     // Do some invalid modify pools
@@ -263,10 +294,10 @@ public class TestCacheDirectives {
     String ownerName = "abc";
     String groupName = "123";
     FsPermission mode = new FsPermission((short)0755);
-    int weight = 150;
+    long limit = 150;
     dfs.addCachePool(new CachePoolInfo(poolName).
         setOwnerName(ownerName).setGroupName(groupName).
-        setMode(mode).setWeight(weight));
+        setMode(mode).setLimit(limit));
     
     RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
     CachePoolInfo info = iter.next().getInfo();
@@ -277,10 +308,10 @@ public class TestCacheDirectives {
     ownerName = "def";
     groupName = "456";
     mode = new FsPermission((short)0700);
-    weight = 151;
+    limit = 151;
     dfs.modifyCachePool(new CachePoolInfo(poolName).
         setOwnerName(ownerName).setGroupName(groupName).
-        setMode(mode).setWeight(weight));
+        setMode(mode).setLimit(limit));
 
     iter = dfs.listCachePools();
     info = iter.next().getInfo();
@@ -288,7 +319,7 @@ public class TestCacheDirectives {
     assertEquals(ownerName, info.getOwnerName());
     assertEquals(groupName, info.getGroupName());
     assertEquals(mode, info.getMode());
-    assertEquals(Integer.valueOf(weight), info.getWeight());
+    assertEquals(limit, (long)info.getLimit());
 
     dfs.removeCachePool(poolName);
     iter = dfs.listCachePools();
@@ -495,30 +526,22 @@ public class TestCacheDirectives {
 
   @Test(timeout=60000)
   public void testCacheManagerRestart() throws Exception {
-    cluster.shutdown();
-    cluster = null;
-    HdfsConfiguration conf = createCachingConf();
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
-
-    cluster.waitActive();
-    DistributedFileSystem dfs = cluster.getFileSystem();
-
     // Create and validate a pool
     final String pool = "poolparty";
     String groupName = "partygroup";
     FsPermission mode = new FsPermission((short)0777);
-    int weight = 747;
+    long limit = 747;
     dfs.addCachePool(new CachePoolInfo(pool)
         .setGroupName(groupName)
         .setMode(mode)
-        .setWeight(weight));
+        .setLimit(limit));
     RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
     assertTrue("No cache pools found", pit.hasNext());
     CachePoolInfo info = pit.next().getInfo();
     assertEquals(pool, info.getPoolName());
     assertEquals(groupName, info.getGroupName());
     assertEquals(mode, info.getMode());
-    assertEquals(weight, (int)info.getWeight());
+    assertEquals(limit, (long)info.getLimit());
     assertFalse("Unexpected # of cache pools found", pit.hasNext());
   
     // Create some cache entries
@@ -556,7 +579,7 @@ public class TestCacheDirectives {
     assertEquals(pool, info.getPoolName());
     assertEquals(groupName, info.getGroupName());
     assertEquals(mode, info.getMode());
-    assertEquals(weight, (int)info.getWeight());
+    assertEquals(limit, (long)info.getLimit());
     assertFalse("Unexpected # of cache pools found", pit.hasNext());
   
     dit = dfs.listCacheDirectives(null);
@@ -762,235 +785,173 @@ public class TestCacheDirectives {
         numCachedReplicas);
   }
 
-  private static final long BLOCK_SIZE = 512;
-  private static final int NUM_DATANODES = 4;
-
-  // Most Linux installs will allow non-root users to lock 64KB.
-  private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
-
-  private static HdfsConfiguration createCachingConf() {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
-    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
-    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
-    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
-    conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
-    return conf;
-  }
-
   @Test(timeout=120000)
   public void testWaitForCachedReplicas() throws Exception {
-    HdfsConfiguration conf = createCachingConf();
     FileSystemTestHelper helper = new FileSystemTestHelper();
-    MiniDFSCluster cluster =
-      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
-    try {
-      cluster.waitActive();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      final NameNode namenode = cluster.getNameNode();
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          return ((namenode.getNamesystem().getCacheCapacity() ==
-              (NUM_DATANODES * CACHE_CAPACITY)) &&
-                (namenode.getNamesystem().getCacheUsed() == 0));
-        }
-      }, 500, 60000);
-
-      NamenodeProtocols nnRpc = namenode.getRpcServer();
-      Path rootDir = helper.getDefaultWorkingDirectory(dfs);
-      // Create the pool
-      final String pool = "friendlyPool";
-      nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
-      // Create some test files
-      final int numFiles = 2;
-      final int numBlocksPerFile = 2;
-      final List<String> paths = new ArrayList<String>(numFiles);
-      for (int i=0; i<numFiles; i++) {
-        Path p = new Path(rootDir, "testCachePaths-" + i);
-        FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
-            (int)BLOCK_SIZE);
-        paths.add(p.toUri().getPath());
-      }
-      // Check the initial statistics at the namenode
-      waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
-      // Cache and check each path in sequence
-      int expected = 0;
-      for (int i=0; i<numFiles; i++) {
-        CacheDirectiveInfo directive =
-            new CacheDirectiveInfo.Builder().
-              setPath(new Path(paths.get(i))).
-              setPool(pool).
-              build();
-        nnRpc.addCacheDirective(directive);
-        expected += numBlocksPerFile;
-        waitForCachedBlocks(namenode, expected, expected,
-            "testWaitForCachedReplicas:1");
-      }
-      // Uncache and check each path in sequence
-      RemoteIterator<CacheDirectiveEntry> entries =
-        new CacheDirectiveIterator(nnRpc, null);
-      for (int i=0; i<numFiles; i++) {
-        CacheDirectiveEntry entry = entries.next();
-        nnRpc.removeCacheDirective(entry.getInfo().getId());
-        expected -= numBlocksPerFile;
-        waitForCachedBlocks(namenode, expected, expected,
-            "testWaitForCachedReplicas:2");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return ((namenode.getNamesystem().getCacheCapacity() ==
+            (NUM_DATANODES * CACHE_CAPACITY)) &&
+              (namenode.getNamesystem().getCacheUsed() == 0));
       }
-    } finally {
-      cluster.shutdown();
-    }
-  }
+    }, 500, 60000);
 
-  @Test(timeout=120000)
-  public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
-      throws Exception {
-    HdfsConfiguration conf = createCachingConf();
-    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
-    MiniDFSCluster cluster =
-      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
-    try {
-      cluster.waitActive();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      NameNode namenode = cluster.getNameNode();
-      // Create the pool
-      String pool = "pool1";
-      namenode.getRpcServer().addCachePool(new CachePoolInfo(pool));
-      // Create some test files
-      final int numFiles = 2;
-      final int numBlocksPerFile = 2;
-      final List<String> paths = new ArrayList<String>(numFiles);
-      for (int i=0; i<numFiles; i++) {
-        Path p = new Path("/testCachePaths-" + i);
-        FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
-            (int)BLOCK_SIZE);
-        paths.add(p.toUri().getPath());
-      }
-      // Check the initial statistics at the namenode
-      waitForCachedBlocks(namenode, 0, 0,
-          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:0");
-      // Cache and check each path in sequence
-      int expected = 0;
-      for (int i=0; i<numFiles; i++) {
-        CacheDirectiveInfo directive =
-            new CacheDirectiveInfo.Builder().
-              setPath(new Path(paths.get(i))).
-              setPool(pool).
-              build();
-        dfs.addCacheDirective(directive);
-        waitForCachedBlocks(namenode, expected, 0,
-          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:1");
-      }
-      Thread.sleep(20000);
-      waitForCachedBlocks(namenode, expected, 0,
-          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:2");
-    } finally {
-      cluster.shutdown();
+    // Send a cache report referring to a bogus block.  It is important that
+    // the NameNode be robust against this.
+    NamenodeProtocols nnRpc = namenode.getRpcServer();
+    DataNode dn0 = cluster.getDataNodes().get(0);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    LinkedList<Long> bogusBlockIds = new LinkedList<Long> ();
+    bogusBlockIds.add(999999L);
+    nnRpc.cacheReport(dn0.getDNRegistrationForBP(bpid), bpid, bogusBlockIds);
+
+    Path rootDir = helper.getDefaultWorkingDirectory(dfs);
+    // Create the pool
+    final String pool = "friendlyPool";
+    nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+    // Create some test files
+    final int numFiles = 2;
+    final int numBlocksPerFile = 2;
+    final List<String> paths = new ArrayList<String>(numFiles);
+    for (int i=0; i<numFiles; i++) {
+      Path p = new Path(rootDir, "testCachePaths-" + i);
+      FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+          (int)BLOCK_SIZE);
+      paths.add(p.toUri().getPath());
+    }
+    // Check the initial statistics at the namenode
+    waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
+    // Cache and check each path in sequence
+    int expected = 0;
+    for (int i=0; i<numFiles; i++) {
+      CacheDirectiveInfo directive =
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path(paths.get(i))).
+            setPool(pool).
+            build();
+      nnRpc.addCacheDirective(directive, EnumSet.noneOf(CacheFlag.class));
+      expected += numBlocksPerFile;
+      waitForCachedBlocks(namenode, expected, expected,
+          "testWaitForCachedReplicas:1");
+    }
+
+    // Check that the datanodes have the right cache values
+    DatanodeInfo[] live = dfs.getDataNodeStats(DatanodeReportType.LIVE);
+    assertEquals("Unexpected number of live nodes", NUM_DATANODES, live.length);
+    long totalUsed = 0;
+    for (DatanodeInfo dn : live) {
+      final long cacheCapacity = dn.getCacheCapacity();
+      final long cacheUsed = dn.getCacheUsed();
+      final long cacheRemaining = dn.getCacheRemaining();
+      assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
+      assertEquals("Capacity not equal to used + remaining",
+          cacheCapacity, cacheUsed + cacheRemaining);
+      assertEquals("Remaining not equal to capacity - used",
+          cacheCapacity - cacheUsed, cacheRemaining);
+      totalUsed += cacheUsed;
+    }
+    assertEquals(expected*BLOCK_SIZE, totalUsed);
+
+    // Uncache and check each path in sequence
+    RemoteIterator<CacheDirectiveEntry> entries =
+      new CacheDirectiveIterator(nnRpc, null);
+    for (int i=0; i<numFiles; i++) {
+      CacheDirectiveEntry entry = entries.next();
+      nnRpc.removeCacheDirective(entry.getInfo().getId());
+      expected -= numBlocksPerFile;
+      waitForCachedBlocks(namenode, expected, expected,
+          "testWaitForCachedReplicas:2");
     }
   }
 
   @Test(timeout=120000)
   public void testWaitForCachedReplicasInDirectory() throws Exception {
-    HdfsConfiguration conf = createCachingConf();
-    MiniDFSCluster cluster =
-      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
-    try {
-      cluster.waitActive();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      NameNode namenode = cluster.getNameNode();
-      // Create the pool
-      final String pool = "friendlyPool";
-      final CachePoolInfo poolInfo = new CachePoolInfo(pool);
-      dfs.addCachePool(poolInfo);
-      // Create some test files
-      final List<Path> paths = new LinkedList<Path>();
-      paths.add(new Path("/foo/bar"));
-      paths.add(new Path("/foo/baz"));
-      paths.add(new Path("/foo2/bar2"));
-      paths.add(new Path("/foo2/baz2"));
-      dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
-      dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
-      final int numBlocksPerFile = 2;
-      for (Path path : paths) {
-        FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
-            (int)BLOCK_SIZE, (short)3, false);
-      }
-      waitForCachedBlocks(namenode, 0, 0,
-          "testWaitForCachedReplicasInDirectory:0");
+    // Create the pool
+    final String pool = "friendlyPool";
+    final CachePoolInfo poolInfo = new CachePoolInfo(pool);
+    dfs.addCachePool(poolInfo);
+    // Create some test files
+    final List<Path> paths = new LinkedList<Path>();
+    paths.add(new Path("/foo/bar"));
+    paths.add(new Path("/foo/baz"));
+    paths.add(new Path("/foo2/bar2"));
+    paths.add(new Path("/foo2/baz2"));
+    dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+    dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+    final int numBlocksPerFile = 2;
+    for (Path path : paths) {
+      FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+          (int)BLOCK_SIZE, (short)3, false);
+    }
+    waitForCachedBlocks(namenode, 0, 0,
+        "testWaitForCachedReplicasInDirectory:0");
 
-      // cache entire directory
-      long id = dfs.addCacheDirective(
-            new CacheDirectiveInfo.Builder().
-              setPath(new Path("/foo")).
-              setReplication((short)2).
-              setPool(pool).
-              build());
-      waitForCachedBlocks(namenode, 4, 8,
-          "testWaitForCachedReplicasInDirectory:1:blocks");
-      // Verify that listDirectives gives the stats we want.
-      waitForCacheDirectiveStats(dfs,
-          4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
-          2, 2,
-          new CacheDirectiveInfo.Builder().
-              setPath(new Path("/foo")).
-              build(),
-          "testWaitForCachedReplicasInDirectory:1:directive");
-      waitForCachePoolStats(dfs,
-          4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
-          2, 2,
-          poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
-
-      long id2 = dfs.addCacheDirective(
-            new CacheDirectiveInfo.Builder().
-              setPath(new Path("/foo/bar")).
-              setReplication((short)4).
-              setPool(pool).
-              build());
-      // wait for an additional 2 cached replicas to come up
-      waitForCachedBlocks(namenode, 4, 10,
-          "testWaitForCachedReplicasInDirectory:2:blocks");
-      // the directory directive's stats are unchanged
-      waitForCacheDirectiveStats(dfs,
-          4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
-          2, 2,
+    // cache entire directory
+    long id = dfs.addCacheDirective(
           new CacheDirectiveInfo.Builder().
-              setPath(new Path("/foo")).
-              build(),
-          "testWaitForCachedReplicasInDirectory:2:directive-1");
-      // verify /foo/bar's stats
-      waitForCacheDirectiveStats(dfs,
-          4 * numBlocksPerFile * BLOCK_SIZE,
-          // only 3 because the file only has 3 replicas, not 4 as requested.
-          3 * numBlocksPerFile * BLOCK_SIZE,
-          1,
-          // only 0 because the file can't be fully cached
-          0,
+            setPath(new Path("/foo")).
+            setReplication((short)2).
+            setPool(pool).
+            build());
+    waitForCachedBlocks(namenode, 4, 8,
+        "testWaitForCachedReplicasInDirectory:1:blocks");
+    // Verify that listDirectives gives the stats we want.
+    waitForCacheDirectiveStats(dfs,
+        4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+        2, 2,
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo")).
+            build(),
+        "testWaitForCachedReplicasInDirectory:1:directive");
+    waitForCachePoolStats(dfs,
+        4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+        2, 2,
+        poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
+
+    long id2 = dfs.addCacheDirective(
           new CacheDirectiveInfo.Builder().
-              setPath(new Path("/foo/bar")).
-              build(),
-          "testWaitForCachedReplicasInDirectory:2:directive-2");
-      waitForCachePoolStats(dfs,
-          (4+4) * numBlocksPerFile * BLOCK_SIZE,
-          (4+3) * numBlocksPerFile * BLOCK_SIZE,
-          3, 2,
-          poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
-
-      // remove and watch numCached go to 0
-      dfs.removeCacheDirective(id);
-      dfs.removeCacheDirective(id2);
-      waitForCachedBlocks(namenode, 0, 0,
-          "testWaitForCachedReplicasInDirectory:3:blocks");
-      waitForCachePoolStats(dfs,
-          0, 0,
-          0, 0,
-          poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
-    } finally {
-      cluster.shutdown();
-    }
+            setPath(new Path("/foo/bar")).
+            setReplication((short)4).
+            setPool(pool).
+            build());
+    // wait for an additional 2 cached replicas to come up
+    waitForCachedBlocks(namenode, 4, 10,
+        "testWaitForCachedReplicasInDirectory:2:blocks");
+    // the directory directive's stats are unchanged
+    waitForCacheDirectiveStats(dfs,
+        4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+        2, 2,
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo")).
+            build(),
+        "testWaitForCachedReplicasInDirectory:2:directive-1");
+    // verify /foo/bar's stats
+    waitForCacheDirectiveStats(dfs,
+        4 * numBlocksPerFile * BLOCK_SIZE,
+        // only 3 because the file only has 3 replicas, not 4 as requested.
+        3 * numBlocksPerFile * BLOCK_SIZE,
+        1,
+        // only 0 because the file can't be fully cached
+        0,
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo/bar")).
+            build(),
+        "testWaitForCachedReplicasInDirectory:2:directive-2");
+    waitForCachePoolStats(dfs,
+        (4+4) * numBlocksPerFile * BLOCK_SIZE,
+        (4+3) * numBlocksPerFile * BLOCK_SIZE,
+        3, 2,
+        poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
+    // remove and watch numCached go to 0
+    dfs.removeCacheDirective(id);
+    dfs.removeCacheDirective(id2);
+    waitForCachedBlocks(namenode, 0, 0,
+        "testWaitForCachedReplicasInDirectory:3:blocks");
+    waitForCachePoolStats(dfs,
+        0, 0,
+        0, 0,
+        poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
   }
 
   /**
@@ -1000,68 +961,57 @@ public class TestCacheDirectives {
    */
   @Test(timeout=120000)
   public void testReplicationFactor() throws Exception {
-    HdfsConfiguration conf = createCachingConf();
-    MiniDFSCluster cluster =
-      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
-    try {
-      cluster.waitActive();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      NameNode namenode = cluster.getNameNode();
-      // Create the pool
-      final String pool = "friendlyPool";
-      dfs.addCachePool(new CachePoolInfo(pool));
-      // Create some test files
-      final List<Path> paths = new LinkedList<Path>();
-      paths.add(new Path("/foo/bar"));
-      paths.add(new Path("/foo/baz"));
-      paths.add(new Path("/foo2/bar2"));
-      paths.add(new Path("/foo2/baz2"));
-      dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
-      dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
-      final int numBlocksPerFile = 2;
-      for (Path path : paths) {
-        FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
-            (int)BLOCK_SIZE, (short)3, false);
-      }
-      waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
-      checkNumCachedReplicas(dfs, paths, 0, 0);
-      // cache directory
-      long id = dfs.addCacheDirective(
+    // Create the pool
+    final String pool = "friendlyPool";
+    dfs.addCachePool(new CachePoolInfo(pool));
+    // Create some test files
+    final List<Path> paths = new LinkedList<Path>();
+    paths.add(new Path("/foo/bar"));
+    paths.add(new Path("/foo/baz"));
+    paths.add(new Path("/foo2/bar2"));
+    paths.add(new Path("/foo2/baz2"));
+    dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+    dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+    final int numBlocksPerFile = 2;
+    for (Path path : paths) {
+      FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+          (int)BLOCK_SIZE, (short)3, false);
+    }
+    waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
+    checkNumCachedReplicas(dfs, paths, 0, 0);
+    // cache directory
+    long id = dfs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().
+          setPath(new Path("/foo")).
+          setReplication((short)1).
+          setPool(pool).
+          build());
+    waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
+    checkNumCachedReplicas(dfs, paths, 4, 4);
+    // step up the replication factor
+    for (int i=2; i<=3; i++) {
+      dfs.modifyCacheDirective(
           new CacheDirectiveInfo.Builder().
-            setPath(new Path("/foo")).
-            setReplication((short)1).
-            setPool(pool).
-            build());
-      waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
-      checkNumCachedReplicas(dfs, paths, 4, 4);
-      // step up the replication factor
-      for (int i=2; i<=3; i++) {
-        dfs.modifyCacheDirective(
-            new CacheDirectiveInfo.Builder().
-            setId(id).
-            setReplication((short)i).
-            build());
-        waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
-        checkNumCachedReplicas(dfs, paths, 4, 4*i);
-      }
-      // step it down
-      for (int i=2; i>=1; i--) {
-        dfs.modifyCacheDirective(
-            new CacheDirectiveInfo.Builder().
-            setId(id).
-            setReplication((short)i).
-            build());
-        waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
-        checkNumCachedReplicas(dfs, paths, 4, 4*i);
-      }
-      // remove and watch numCached go to 0
-      dfs.removeCacheDirective(id);
-      waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
-      checkNumCachedReplicas(dfs, paths, 0, 0);
-    } finally {
-      cluster.shutdown();
+          setId(id).
+          setReplication((short)i).
+          build());
+      waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
+      checkNumCachedReplicas(dfs, paths, 4, 4*i);
     }
+    // step it down
+    for (int i=2; i>=1; i--) {
+      dfs.modifyCacheDirective(
+          new CacheDirectiveInfo.Builder().
+          setId(id).
+          setReplication((short)i).
+          build());
+      waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
+      checkNumCachedReplicas(dfs, paths, 4, 4*i);
+    }
+    // remove and watch numCached go to 0
+    dfs.removeCacheDirective(id);
+    waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
+    checkNumCachedReplicas(dfs, paths, 0, 0);
   }
 
   @Test(timeout=60000)
@@ -1081,11 +1031,12 @@ public class TestCacheDirectives {
     assertNull("Unexpected owner name", info.getOwnerName());
     assertNull("Unexpected group name", info.getGroupName());
     assertNull("Unexpected mode", info.getMode());
-    assertNull("Unexpected weight", info.getWeight());
+    assertNull("Unexpected limit", info.getLimit());
     // Modify the pool so myuser is now the owner
+    final long limit = 99;
     dfs.modifyCachePool(new CachePoolInfo(poolName)
         .setOwnerName(myUser.getShortUserName())
-        .setWeight(99));
+        .setLimit(limit));
     // Should see full info
     it = myDfs.listCachePools();
     info = it.next().getInfo();
@@ -1096,60 +1047,308 @@ public class TestCacheDirectives {
     assertNotNull("Expected group name", info.getGroupName());
     assertEquals("Mismatched mode", (short) 0700,
         info.getMode().toShort());
-    assertEquals("Mismatched weight", 99, (int)info.getWeight());
+    assertEquals("Mismatched limit", limit, (long)info.getLimit());
   }
 
-  @Test(timeout=60000)
+  @Test(timeout=120000)
   public void testExpiry() throws Exception {
-    HdfsConfiguration conf = createCachingConf();
-    MiniDFSCluster cluster =
-      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-    try {
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      String pool = "pool1";
-      dfs.addCachePool(new CachePoolInfo(pool));
-      Path p = new Path("/mypath");
-      DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
-      // Expire after test timeout
-      Date start = new Date();
-      Date expiry = DateUtils.addSeconds(start, 120);
-      final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
-          .setPath(p)
-          .setPool(pool)
-          .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
-          .setReplication((short)2)
-          .build());
-      waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
-      // Change it to expire sooner
-      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
-          .setExpiration(Expiration.newRelative(0)).build());
-      waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
-      RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
-      CacheDirectiveEntry ent = it.next();
-      assertFalse(it.hasNext());
-      Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
-      assertTrue("Directive should have expired",
-          entryExpiry.before(new Date()));
-      // Change it back to expire later
+    String pool = "pool1";
+    dfs.addCachePool(new CachePoolInfo(pool));
+    Path p = new Path("/mypath");
+    DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
+    // Expire after test timeout
+    Date start = new Date();
+    Date expiry = DateUtils.addSeconds(start, 120);
+    final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+        .setPath(p)
+        .setPool(pool)
+        .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
+        .setReplication((short)2)
+        .build());
+    waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
+    // Change it to expire sooner
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+        .setExpiration(Expiration.newRelative(0)).build());
+    waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
+    RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
+    CacheDirectiveEntry ent = it.next();
+    assertFalse(it.hasNext());
+    Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+    assertTrue("Directive should have expired",
+        entryExpiry.before(new Date()));
+    // Change it back to expire later
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+        .setExpiration(Expiration.newRelative(120000)).build());
+    waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
+    it = dfs.listCacheDirectives(null);
+    ent = it.next();
+    assertFalse(it.hasNext());
+    entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+    assertTrue("Directive should not have expired",
+        entryExpiry.after(new Date()));
+    // Verify that setting a negative TTL throws an error
+    try {
       dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
-          .setExpiration(Expiration.newRelative(120000)).build());
-      waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
-      it = dfs.listCacheDirectives(null);
-      ent = it.next();
-      assertFalse(it.hasNext());
-      entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
-      assertTrue("Directive should not have expired",
-          entryExpiry.after(new Date()));
-      // Verify that setting a negative TTL throws an error
-      try {
-        dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
-            .setExpiration(Expiration.newRelative(-1)).build());
-      } catch (InvalidRequestException e) {
-        GenericTestUtils
-            .assertExceptionContains("Cannot set a negative expiration", e);
-      }
-    } finally {
-      cluster.shutdown();
+          .setExpiration(Expiration.newRelative(-1)).build());
+    } catch (InvalidRequestException e) {
+      GenericTestUtils
+          .assertExceptionContains("Cannot set a negative expiration", e);
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testLimit() throws Exception {
+    try {
+      dfs.addCachePool(new CachePoolInfo("poolofnegativity").setLimit(-99l));
+      fail("Should not be able to set a negative limit");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("negative", e);
+    }
+    final String destiny = "poolofdestiny";
+    final Path path1 = new Path("/destiny");
+    DFSTestUtil.createFile(dfs, path1, 2*BLOCK_SIZE, (short)1, 0x9494);
+    // Start off with a limit that is too small
+    final CachePoolInfo poolInfo = new CachePoolInfo(destiny)
+        .setLimit(2*BLOCK_SIZE-1);
+    dfs.addCachePool(poolInfo);
+    final CacheDirectiveInfo info1 = new CacheDirectiveInfo.Builder()
+        .setPool(destiny).setPath(path1).build();
+    try {
+      dfs.addCacheDirective(info1);
+      fail("Should not be able to cache when there is no more limit");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("remaining capacity", e);
+    }
+    // Raise the limit up to fit and it should work this time
+    poolInfo.setLimit(2*BLOCK_SIZE);
+    dfs.modifyCachePool(poolInfo);
+    long id1 = dfs.addCacheDirective(info1);
+    waitForCachePoolStats(dfs,
+        2*BLOCK_SIZE, 2*BLOCK_SIZE,
+        1, 1,
+        poolInfo, "testLimit:1");
+    // Adding another file, it shouldn't be cached
+    final Path path2 = new Path("/failure");
+    DFSTestUtil.createFile(dfs, path2, BLOCK_SIZE, (short)1, 0x9495);
+    try {
+      dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+          .setPool(destiny).setPath(path2).build(),
+          EnumSet.noneOf(CacheFlag.class));
+      fail("Should not be able to add another cached file");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("remaining capacity", e);
+    }
+    // Bring the limit down, the first file should get uncached
+    poolInfo.setLimit(BLOCK_SIZE);
+    dfs.modifyCachePool(poolInfo);
+    waitForCachePoolStats(dfs,
+        2*BLOCK_SIZE, 0,
+        1, 0,
+        poolInfo, "testLimit:2");
+    RemoteIterator<CachePoolEntry> it = dfs.listCachePools();
+    assertTrue("Expected a cache pool", it.hasNext());
+    CachePoolStats stats = it.next().getStats();
+    assertEquals("Overlimit bytes should be difference of needed and limit",
+        BLOCK_SIZE, stats.getBytesOverlimit());
+    // Moving a directive to a pool without enough limit should fail
+    CachePoolInfo inadequate =
+        new CachePoolInfo("poolofinadequacy").setLimit(BLOCK_SIZE);
+    dfs.addCachePool(inadequate);
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1)
+          .setId(id1).setPool(inadequate.getPoolName()).build(),
+          EnumSet.noneOf(CacheFlag.class));
+    } catch(InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("remaining capacity", e);
+    }
+    // Succeeds when force=true
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1).setId(id1)
+        .setPool(inadequate.getPoolName()).build(),
+        EnumSet.of(CacheFlag.FORCE));
+    // Also can add with force=true
+    dfs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().setPool(inadequate.getPoolName())
+            .setPath(path1).build(), EnumSet.of(CacheFlag.FORCE));
+  }
+
+  @Test(timeout=30000)
+  public void testMaxRelativeExpiry() throws Exception {
+    // Test that negative and really big max expirations can't be set during add
+    try {
+      dfs.addCachePool(new CachePoolInfo("failpool").setMaxRelativeExpiryMs(-1l));
+      fail("Added a pool with a negative max expiry.");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("negative", e);
+    }
+    try {
+      dfs.addCachePool(new CachePoolInfo("failpool")
+          .setMaxRelativeExpiryMs(Long.MAX_VALUE - 1));
+      fail("Added a pool with too big of a max expiry.");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("too big", e);
+    }
+    // Test that setting a max relative expiry on a pool works
+    CachePoolInfo coolPool = new CachePoolInfo("coolPool");
+    final long poolExpiration = 1000 * 60 * 10l;
+    dfs.addCachePool(coolPool.setMaxRelativeExpiryMs(poolExpiration));
+    RemoteIterator<CachePoolEntry> poolIt = dfs.listCachePools();
+    CachePoolInfo listPool = poolIt.next().getInfo();
+    assertFalse("Should only be one pool", poolIt.hasNext());
+    assertEquals("Expected max relative expiry to match set value",
+        poolExpiration, listPool.getMaxRelativeExpiryMs().longValue());
+    // Test that negative and really big max expirations can't be modified
+    try {
+      dfs.addCachePool(coolPool.setMaxRelativeExpiryMs(-1l));
+      fail("Added a pool with a negative max expiry.");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("negative", e);
+    }
+    try {
+      dfs.modifyCachePool(coolPool
+          .setMaxRelativeExpiryMs(CachePoolInfo.RELATIVE_EXPIRY_NEVER+1));
+      fail("Added a pool with too big of a max expiry.");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("too big", e);
+    }
+    // Test that adding a directives without an expiration uses the pool's max
+    CacheDirectiveInfo defaultExpiry = new CacheDirectiveInfo.Builder()
+        .setPath(new Path("/blah"))
+        .setPool(coolPool.getPoolName())
+        .build();
+    dfs.addCacheDirective(defaultExpiry);
+    RemoteIterator<CacheDirectiveEntry> dirIt =
+        dfs.listCacheDirectives(defaultExpiry);
+    CacheDirectiveInfo listInfo = dirIt.next().getInfo();
+    assertFalse("Should only have one entry in listing", dirIt.hasNext());
+    long listExpiration = listInfo.getExpiration().getAbsoluteMillis()
+        - new Date().getTime();
+    assertTrue("Directive expiry should be approximately the pool's max expiry",
+        Math.abs(listExpiration - poolExpiration) < 10*1000);
+    // Test that the max is enforced on add for relative and absolute
+    CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder()
+        .setPath(new Path("/lolcat"))
+        .setPool(coolPool.getPoolName());
+    try {
+      dfs.addCacheDirective(builder
+          .setExpiration(Expiration.newRelative(poolExpiration+1))
+          .build());
+      fail("Added a directive that exceeds pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    try {
+      dfs.addCacheDirective(builder
+          .setExpiration(Expiration.newAbsolute(
+              new Date().getTime() + poolExpiration + (10*1000)))
+          .build());
+      fail("Added a directive that exceeds pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    // Test that max is enforced on modify for relative and absolute Expirations
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setExpiration(Expiration.newRelative(poolExpiration+1))
+          .build());
+      fail("Modified a directive to exceed pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setExpiration(Expiration.newAbsolute(
+              new Date().getTime() + poolExpiration + (10*1000)))
+          .build());
+      fail("Modified a directive to exceed pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    // Test some giant limit values with add
+    try {
+      dfs.addCacheDirective(builder
+          .setExpiration(Expiration.newRelative(
+              Long.MAX_VALUE))
+          .build());
+      fail("Added a directive with a gigantic max value");
+    } catch (IllegalArgumentException e) {
+      assertExceptionContains("is too far in the future", e);
+    }
+    try {
+      dfs.addCacheDirective(builder
+          .setExpiration(Expiration.newAbsolute(
+              Long.MAX_VALUE))
+          .build());
+      fail("Added a directive with a gigantic max value");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("is too far in the future", e);
+    }
+    // Test some giant limit values with modify
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setExpiration(Expiration.NEVER)
+          .build());
+      fail("Modified a directive to exceed pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setExpiration(Expiration.newAbsolute(
+              Long.MAX_VALUE))
+          .build());
+      fail("Modified a directive to exceed pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("is too far in the future", e);
+    }
+    // Test that the max is enforced on modify correctly when changing pools
+    CachePoolInfo destPool = new CachePoolInfo("destPool");
+    dfs.addCachePool(destPool.setMaxRelativeExpiryMs(poolExpiration / 2));
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setPool(destPool.getPoolName())
+          .build());
+      fail("Modified a directive to a pool with a lower max expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
     }
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+        .setId(listInfo.getId())
+        .setPool(destPool.getPoolName())
+        .setExpiration(Expiration.newRelative(poolExpiration / 2))
+        .build());
+    dirIt = dfs.listCacheDirectives(new CacheDirectiveInfo.Builder()
+        .setPool(destPool.getPoolName())
+        .build());
+    listInfo = dirIt.next().getInfo();
+    listExpiration = listInfo.getExpiration().getAbsoluteMillis()
+        - new Date().getTime();
+    assertTrue("Unexpected relative expiry " + listExpiration
+        + " expected approximately " + poolExpiration/2,
+        Math.abs(poolExpiration/2 - listExpiration) < 10*1000);
+    // Test that cache pool and directive expiry can be modified back to never
+    dfs.modifyCachePool(destPool
+        .setMaxRelativeExpiryMs(CachePoolInfo.RELATIVE_EXPIRY_NEVER));
+    poolIt = dfs.listCachePools();
+    listPool = poolIt.next().getInfo();
+    while (!listPool.getPoolName().equals(destPool.getPoolName())) {
+      listPool = poolIt.next().getInfo();
+    }
+    assertEquals("Expected max relative expiry to match set value",
+        CachePoolInfo.RELATIVE_EXPIRY_NEVER,
+        listPool.getMaxRelativeExpiryMs().longValue());
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder()
+        .setId(listInfo.getId())
+        .setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER))
+        .build());
+    // Test modifying close to the limit
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder()
+        .setId(listInfo.getId())
+        .setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER - 1))
+        .build());
   }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Fri Jan  3 07:26:52 2014
@@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.na
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -72,6 +75,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
@@ -106,6 +110,7 @@ public class TestCheckpoint {
   }
 
   static final Log LOG = LogFactory.getLog(TestCheckpoint.class); 
+  static final String NN_METRICS = "NameNodeActivity";
   
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 4096;
@@ -1048,6 +1053,14 @@ public class TestCheckpoint {
       //
       secondary = startSecondaryNameNode(conf);
       secondary.doCheckpoint();
+
+      MetricsRecordBuilder rb = getMetrics(NN_METRICS);
+      assertCounterGt("GetImageNumOps", 0, rb);
+      assertCounterGt("GetEditNumOps", 0, rb);
+      assertCounterGt("PutImageNumOps", 0, rb);
+      assertGaugeGt("GetImageAvgTime", 0.0, rb);
+      assertGaugeGt("GetEditAvgTime", 0.0, rb);
+      assertGaugeGt("PutImageAvgTime", 0.0, rb);
     } finally {
       fileSys.close();
       cleanup(secondary);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java Fri Jan  3 07:26:52 2014
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.junit.Test;
 
@@ -47,7 +47,7 @@ public class TestCommitBlockSynchronizat
       throws IOException {
     Configuration conf = new Configuration();
     FSImage image = new FSImage(conf);
-    DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
+    final DatanodeStorageInfo[] targets = {};
 
     FSNamesystem namesystem = new FSNamesystem(conf, image);
     FSNamesystem namesystemSpy = spy(namesystem);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java Fri Jan  3 07:26:52 2014
@@ -104,11 +104,11 @@ public class TestDeadDatanode {
     DatanodeRegistration reg = 
       DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
       
-    waitForDatanodeState(reg.getStorageID(), true, 20000);
+    waitForDatanodeState(reg.getDatanodeUuid(), true, 20000);
 
     // Shutdown and wait for datanode to be marked dead
     dn.shutdown();
-    waitForDatanodeState(reg.getStorageID(), false, 20000);
+    waitForDatanodeState(reg.getDatanodeUuid(), false, 20000);
 
     DatanodeProtocol dnp = cluster.getNameNodeRpc();
     
@@ -117,7 +117,7 @@ public class TestDeadDatanode {
         ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
         null) };
     StorageReceivedDeletedBlocks[] storageBlocks = { 
-        new StorageReceivedDeletedBlocks(reg.getStorageID(), blocks) };
+        new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) };
     
     // Ensure blockReceived call from dead datanode is rejected with IOException
     try {
@@ -129,7 +129,7 @@ public class TestDeadDatanode {
 
     // Ensure blockReport from dead datanode is rejected with IOException
     StorageBlockReport[] report = { new StorageBlockReport(
-        new DatanodeStorage(reg.getStorageID()),
+        new DatanodeStorage(reg.getDatanodeUuid()),
         new long[] { 0L, 0L, 0L }) };
     try {
       dnp.blockReport(reg, poolId, report);
@@ -140,7 +140,7 @@ public class TestDeadDatanode {
 
     // Ensure heartbeat from dead datanode is rejected with a command
     // that asks datanode to register again
-    StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0,
+    StorageReport[] rep = { new StorageReport(reg.getDatanodeUuid(), false, 0, 0,
         0, 0) };
     DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0)
         .getCommands();

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java Fri Jan  3 07:26:52 2014
@@ -31,6 +31,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
@@ -383,4 +384,33 @@ public class TestFSEditLogLoader {
     assertTrue(!validation.hasCorruptHeader());
     assertEquals(HdfsConstants.INVALID_TXID, validation.getEndTxId());
   }
+
+  private static final Map<Byte, FSEditLogOpCodes> byteToEnum =
+      new HashMap<Byte, FSEditLogOpCodes>();
+  static {
+    for(FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
+      byteToEnum.put(opCode.getOpCode(), opCode);
+    }
+  }
+
+  private static FSEditLogOpCodes fromByte(byte opCode) {
+    return byteToEnum.get(opCode);
+  }
+
+  @Test
+  public void testFSEditLogOpCodes() throws IOException {
+    //try all codes
+    for(FSEditLogOpCodes c : FSEditLogOpCodes.values()) {
+      final byte code = c.getOpCode();
+      assertEquals("c=" + c + ", code=" + code,
+          c, FSEditLogOpCodes.fromByte(code));
+    }
+
+    //try all byte values
+    for(int b = 0; b < (1 << Byte.SIZE); b++) {
+      final byte code = (byte)b;
+      assertEquals("b=" + b + ", code=" + code,
+          fromByte(code), FSEditLogOpCodes.fromByte(code));
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java Fri Jan  3 07:26:52 2014
@@ -41,8 +41,8 @@ import org.apache.hadoop.hdfs.client.Hdf
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.log4j.Level;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java Fri Jan  3 07:26:52 2014
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -780,7 +781,7 @@ public class TestINodeFile {
       }
       System.out.println("Adding component " + DFSUtil.bytes2String(component));
       dir = new INodeDirectory(++id, component, permstatus, 0);
-      prev.addChild(dir, false, null, null);
+      prev.addChild(dir, false, null);
       prev = dir;
     }
     return dir; // Last Inode in the chain
@@ -921,6 +922,7 @@ public class TestINodeFile {
   public void testDotdotInodePath() throws Exception {
     final Configuration conf = new Configuration();
     MiniDFSCluster cluster = null;
+    DFSClient client = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
@@ -933,7 +935,7 @@ public class TestINodeFile {
       long parentId = fsdir.getINode("/").getId();
       String testPath = "/.reserved/.inodes/" + dirId + "/..";
 
-      DFSClient client = new DFSClient(NameNode.getAddress(conf), conf);
+      client = new DFSClient(NameNode.getAddress(conf), conf);
       HdfsFileStatus status = client.getFileInfo(testPath);
       assertTrue(parentId == status.getFileId());
       
@@ -943,6 +945,7 @@ public class TestINodeFile {
       assertTrue(parentId == status.getFileId());
       
     } finally {
+      IOUtils.cleanup(LOG, client);
       if (cluster != null) {
         cluster.shutdown();
       }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java Fri Jan  3 07:26:52 2014
@@ -55,7 +55,7 @@ public class TestListCorruptFileBlocks {
   static Log LOG = NameNode.stateChangeLog;
 
   /** check if nn.getCorruptFiles() returns a file that has corrupted blocks */
-  @Test
+  @Test (timeout=300000)
   public void testListCorruptFilesCorruptedBlock() throws Exception {
     MiniDFSCluster cluster = null;
     Random random = new Random();
@@ -131,7 +131,7 @@ public class TestListCorruptFileBlocks {
   /**
    * Check that listCorruptFileBlocks works while the namenode is still in safemode.
    */
-  @Test
+  @Test (timeout=300000)
   public void testListCorruptFileBlocksInSafeMode() throws Exception {
     MiniDFSCluster cluster = null;
     Random random = new Random();
@@ -262,7 +262,7 @@ public class TestListCorruptFileBlocks {
   }
   
   // deliberately remove blocks from a file and validate the list-corrupt-file-blocks API
-  @Test
+  @Test (timeout=300000)
   public void testlistCorruptFileBlocks() throws Exception {
     Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
@@ -372,7 +372,7 @@ public class TestListCorruptFileBlocks {
   /**
    * test listCorruptFileBlocks in DistributedFileSystem
    */
-  @Test
+  @Test (timeout=300000)
   public void testlistCorruptFileBlocksDFS() throws Exception {
     Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
@@ -446,7 +446,7 @@ public class TestListCorruptFileBlocks {
    * Also, test that DFS.listCorruptFileBlocks can make multiple successive
    * calls.
    */
-  @Test
+  @Test (timeout=300000)
   public void testMaxCorruptFiles() throws Exception {
     MiniDFSCluster cluster = null;
     try {

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java Fri Jan  3 07:26:52 2014
@@ -316,10 +316,11 @@ public class TestNamenodeRetryCache {
     ExtendedBlock oldBlock = new ExtendedBlock();
     ExtendedBlock newBlock = new ExtendedBlock();
     DatanodeID[] newNodes = new DatanodeID[2];
+    String[] newStorages = new String[2];
     
     newCall();
     try {
-      ns0.updatePipeline("testClient", oldBlock, newBlock, newNodes);
+      ns0.updatePipeline("testClient", oldBlock, newBlock, newNodes, newStorages);
       fail("Expect StandbyException from the updatePipeline call");
     } catch (StandbyException e) {
       // expected, since in the beginning both nn are in standby state
@@ -329,7 +330,7 @@ public class TestNamenodeRetryCache {
     
     cluster.transitionToActive(0);
     try {
-      ns0.updatePipeline("testClient", oldBlock, newBlock, newNodes);
+      ns0.updatePipeline("testClient", oldBlock, newBlock, newNodes, newStorages);
     } catch (IOException e) {
       // ignore call should not hang.
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java Fri Jan  3 07:26:52 2014
@@ -31,10 +31,10 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -57,7 +57,7 @@ public class TestSnapshotPathINodes {
   static private DistributedFileSystem hdfs;
 
   @BeforeClass
-  static public void setUp() throws Exception {
+  public static void setUp() throws Exception {
     conf = new Configuration();
     cluster = new MiniDFSCluster.Builder(conf)
       .numDataNodes(REPLICATION)
@@ -68,12 +68,16 @@ public class TestSnapshotPathINodes {
     fsdir = fsn.getFSDirectory();
     
     hdfs = cluster.getFileSystem();
+  }
+
+  @Before
+  public void reset() throws Exception {
     DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, seed);
     DFSTestUtil.createFile(hdfs, file2, 1024, REPLICATION, seed);
   }
 
   @AfterClass
-  static public void tearDown() throws Exception {
+  public static void tearDown() throws Exception {
     if (cluster != null) {
       cluster.shutdown();
     }
@@ -201,8 +205,7 @@ public class TestSnapshotPathINodes {
     // Check the INode for file1 (snapshot file)
     INode snapshotFileNode = inodes[inodes.length - 1]; 
     assertINodeFile(snapshotFileNode, file1);
-    assertTrue(snapshotFileNode.getParent() instanceof 
-        INodeDirectoryWithSnapshot);
+    assertTrue(snapshotFileNode.getParent().isWithSnapshot());
     
     // Call getExistingPathINodes and request only one INode.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 1, false);
@@ -251,6 +254,8 @@ public class TestSnapshotPathINodes {
         System.out.println("The exception is expected: " + fnfe);
       }
     }
+    hdfs.deleteSnapshot(sub1, "s1");
+    hdfs.disallowSnapshot(sub1);
   }
   
   /** 
@@ -308,6 +313,8 @@ public class TestSnapshotPathINodes {
         sub1.toString());
     assertEquals(inodes[components.length - 3].getFullPathName(),
         dir.toString());
+    hdfs.deleteSnapshot(sub1, "s2");
+    hdfs.disallowSnapshot(sub1);
   }
   
   static private Snapshot s4;
@@ -367,6 +374,8 @@ public class TestSnapshotPathINodes {
         sub1.toString());
     assertEquals(inodes[components.length - 3].getFullPathName(),
         dir.toString());
+    hdfs.deleteSnapshot(sub1, "s4");
+    hdfs.disallowSnapshot(sub1);
   }
   
   /** 
@@ -375,9 +384,6 @@ public class TestSnapshotPathINodes {
    */
   @Test (timeout=15000)
   public void testSnapshotPathINodesAfterModification() throws Exception {
-    //file1 was deleted, create it again.
-    DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, seed);
-
     // First check the INode for /TestSnapshot/sub1/file1
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
@@ -385,7 +391,6 @@ public class TestSnapshotPathINodes {
     INode[] inodes = nodesInPath.getINodes();
     // The number of inodes should be equal to components.length
     assertEquals(inodes.length, components.length);
-    assertSnapshot(nodesInPath, false, s4, -1);
 
     // The last INode should be associated with file1
     assertEquals(inodes[components.length - 1].getFullPathName(),
@@ -434,5 +439,7 @@ public class TestSnapshotPathINodes {
     assertEquals(newInodes[last].getFullPathName(), file1.toString());
     // The modification time of the INode for file3 should have been changed
     Assert.assertFalse(modTime == newInodes[last].getModificationTime());
+    hdfs.deleteSnapshot(sub1, "s3");
+    hdfs.disallowSnapshot(sub1);
   }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java Fri Jan  3 07:26:52 2014
@@ -27,6 +27,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.LinkedList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -59,6 +60,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 /**
  * Tests state transition from active->standby, and manual failover
  * and failback between two namenodes.
@@ -124,6 +127,17 @@ public class TestHAStateTransitions {
     }
   }
 
+  private void addCrmThreads(MiniDFSCluster cluster,
+      LinkedList<Thread> crmThreads) {
+    for (int nn = 0; nn <= 1; nn++) {
+      Thread thread = cluster.getNameNode(nn).getNamesystem().
+          getCacheManager().getCacheReplicationMonitor();
+      if (thread != null) {
+        crmThreads.add(thread);
+      }
+    }
+  }
+
   /**
    * Test that transitioning a service to the state that it is already
    * in is a nop, specifically, an exception is not thrown.
@@ -131,19 +145,30 @@ public class TestHAStateTransitions {
   @Test
   public void testTransitionToCurrentStateIsANop() throws Exception {
     Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1L);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .nnTopology(MiniDFSNNTopology.simpleHATopology())
       .numDataNodes(1)
       .build();
+    LinkedList<Thread> crmThreads = new LinkedList<Thread>();
     try {
       cluster.waitActive();
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToActive(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToActive(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToStandby(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToStandby(0);
+      addCrmThreads(cluster, crmThreads);
     } finally {
       cluster.shutdown();
     }
+    // Verify that all cacheReplicationMonitor threads shut down
+    for (Thread thread : crmThreads) {
+      Uninterruptibles.joinUninterruptibly(thread);
+    }
   }
 
   /**

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java Fri Jan  3 07:26:52 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -523,16 +524,17 @@ public class TestPipelinesFailover {
       (BlockInfoUnderConstruction)storedBlock;
     // We expect that the replica with the most recent heart beat will be
     // the one to be in charge of the synchronization / recovery protocol.
-    DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations();
-    DatanodeDescriptor expectedPrimary = datanodes[0];
-    long mostRecentLastUpdate = expectedPrimary.getLastUpdate();
-    for (int i = 1; i < datanodes.length; i++) {
-      if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) {
-        expectedPrimary = datanodes[i];
-        mostRecentLastUpdate = expectedPrimary.getLastUpdate();
+    final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
+    DatanodeStorageInfo expectedPrimary = storages[0];
+    long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate();
+    for (int i = 1; i < storages.length; i++) {
+      final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate();
+      if (lastUpdate > mostRecentLastUpdate) {
+        expectedPrimary = storages[i];
+        mostRecentLastUpdate = lastUpdate;
       }
     }
-    return expectedPrimary;
+    return expectedPrimary.getDatanodeDescriptor();
   }
 
   private DistributedFileSystem createFsAsOtherUser(

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java Fri Jan  3 07:26:52 2014
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -715,9 +716,10 @@ public class TestRetryCacheWithHA {
       DatanodeInfo[] newNodes = new DatanodeInfo[2];
       newNodes[0] = nodes[0];
       newNodes[1] = nodes[1];
+      String[] storageIDs = {"s0", "s1"};
       
       client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
-          newBlock, newNodes);
+          newBlock, newNodes, storageIDs);
       out.close();
     }
 
@@ -727,10 +729,10 @@ public class TestRetryCacheWithHA {
           .getINode4Write(file).asFile();
       BlockInfoUnderConstruction blkUC = 
           (BlockInfoUnderConstruction) (fileNode.getBlocks())[1];
-      int datanodeNum = blkUC.getExpectedLocations().length;
+      int datanodeNum = blkUC.getExpectedStorageLocations().length;
       for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) {
         Thread.sleep(1000);
-        datanodeNum = blkUC.getExpectedLocations().length;
+        datanodeNum = blkUC.getExpectedStorageLocations().length;
       }
       return datanodeNum == 2;
     }
@@ -759,7 +761,7 @@ public class TestRetryCacheWithHA {
 
     @Override
     void invoke() throws Exception {
-      result = client.addCacheDirective(directive);
+      result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
     }
 
     @Override
@@ -801,7 +803,7 @@ public class TestRetryCacheWithHA {
     @Override
     void prepare() throws Exception {
       dfs.addCachePool(new CachePoolInfo(directive.getPool()));
-      id = client.addCacheDirective(directive);
+      id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
     }
 
     @Override
@@ -810,7 +812,7 @@ public class TestRetryCacheWithHA {
           new CacheDirectiveInfo.Builder().
               setId(id).
               setReplication(newReplication).
-              build());
+              build(), EnumSet.of(CacheFlag.FORCE));
     }
 
     @Override
@@ -857,7 +859,7 @@ public class TestRetryCacheWithHA {
     @Override
     void prepare() throws Exception {
       dfs.addCachePool(new CachePoolInfo(directive.getPool()));
-      id = dfs.addCacheDirective(directive);
+      id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
     }
 
     @Override
@@ -935,19 +937,19 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
-      client.addCachePool(new CachePoolInfo(pool).setWeight(10));
+      client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
     }
 
     @Override
     void invoke() throws Exception {
-      client.modifyCachePool(new CachePoolInfo(pool).setWeight(99));
+      client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
     }
 
     @Override
     boolean checkNamenodeBeforeReturn() throws Exception {
       for (int i = 0; i < CHECKTIMES; i++) {
         RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
-        if (iter.hasNext() && iter.next().getInfo().getWeight() == 99) {
+        if (iter.hasNext() && (long)iter.next().getInfo().getLimit() == 99) {
           return true;
         }
         Thread.sleep(1000);
@@ -1215,7 +1217,7 @@ public class TestRetryCacheWithHA {
       CacheDirectiveInfo directiveInfo =
         new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build();
       dfs.addCachePool(new CachePoolInfo(poolName));
-      dfs.addCacheDirective(directiveInfo);
+      dfs.addCacheDirective(directiveInfo, EnumSet.of(CacheFlag.FORCE));
       poolNames.add(poolName);
     }
     listCacheDirectives(poolNames, 0);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java Fri Jan  3 07:26:52 2014
@@ -228,7 +228,7 @@ public class TestNameNodeMetrics {
     cluster.getNamesystem().writeLock();
     try {
       bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
-          "TEST");
+          "STORAGE_ID", "TEST");
     } finally {
       cluster.getNamesystem().writeUnlock();
     }
@@ -279,7 +279,7 @@ public class TestNameNodeMetrics {
     cluster.getNamesystem().writeLock();
     try {
       bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
-          "TEST");
+          "STORAGE_ID", "TEST");
     } finally {
       cluster.getNamesystem().writeUnlock();
     }
@@ -434,7 +434,8 @@ public class TestNameNodeMetrics {
     // We have one sync when the cluster starts up, just opening the journal
     assertCounter("SyncsNumOps", 1L, rb);
     // Each datanode reports in when the cluster comes up
-    assertCounter("BlockReportNumOps", (long)DATANODE_COUNT, rb);
+    assertCounter("BlockReportNumOps",
+                  (long)DATANODE_COUNT*MiniDFSCluster.DIRS_PER_DATANODE, rb);
     
     // Sleep for an interval+slop to let the percentiles rollover
     Thread.sleep((PERCENTILES_INTERVAL+1)*1000);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java Fri Jan  3 07:26:52 2014
@@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java Fri Jan  3 07:26:52 2014
@@ -358,7 +358,7 @@ public class TestNestedSnapshots {
     
     FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
     INode subNode = fsdir.getINode(sub.toString());
-    assertTrue(subNode instanceof INodeDirectoryWithSnapshot);
+    assertTrue(subNode.asDirectory().isWithSnapshot());
     
     hdfs.allowSnapshot(sub);
     subNode = fsdir.getINode(sub.toString());
@@ -366,6 +366,6 @@ public class TestNestedSnapshots {
     
     hdfs.disallowSnapshot(sub);
     subNode = fsdir.getINode(sub.toString());
-    assertTrue(subNode instanceof INodeDirectoryWithSnapshot);
+    assertTrue(subNode.asDirectory().isWithSnapshot());
   }
 }



Mime
View raw message