hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1560522 [8/9] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/m...
Date Wed, 22 Jan 2014 21:43:04 GMT
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Wed Jan 22 21:43:00 2014
@@ -956,8 +956,8 @@ public class NNThroughputBenchmark imple
       // TODO:FEDERATION currently a single block pool is supported
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
-      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
-          rep, 0, 0, 0).getCommands();
+      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
+          0L, 0L, 0, 0, 0).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1004,7 +1004,7 @@ public class NNThroughputBenchmark imple
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
       DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
-          rep, 0, 0, 0).getCommands();
+          rep, 0L, 0L, 0, 0, 0).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Jan 22 21:43:00 2014
@@ -114,7 +114,7 @@ public class NameNodeAdapter {
       DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
-        0, 0, 0);
+        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1560522&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Wed Jan 22 21:43:00 2014
@@ -0,0 +1,1393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.protocol.CachePoolInfo.RELATIVE_EXPIRY_NEVER;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolStats;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.GSet;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+public class TestCacheDirectives {
+  static final Log LOG = LogFactory.getLog(TestCacheDirectives.class);
+
+  private static final UserGroupInformation unprivilegedUser =
+      UserGroupInformation.createRemoteUser("unprivilegedUser");
+
+  static private Configuration conf;
+  static private MiniDFSCluster cluster;
+  static private DistributedFileSystem dfs;
+  static private NamenodeProtocols proto;
+  static private NameNode namenode;
+  static private CacheManipulator prevCacheManipulator;
+
+  static {
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
+  }
+
+  private static final long BLOCK_SIZE = 4096;
+  private static final int NUM_DATANODES = 4;
+  // Most Linux installs will allow non-root users to lock 64KB.
+  // In this test though, we stub out mlock so this doesn't matter.
+  private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
+
+  private static HdfsConfiguration createCachingConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
+    // set low limits here for testing purposes
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
+        2);
+
+    return conf;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = createCachingConf();
+    cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+    cluster.waitActive();
+    dfs = cluster.getFileSystem();
+    proto = cluster.getNameNodeRpc();
+    namenode = cluster.getNameNode();
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+    LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(CacheManager.class.getName()).setLevel(
+        Level.TRACE);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    // Restore the original CacheManipulator
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
+  }
+
+  @Test(timeout=60000)
+  public void testBasicPoolOperations() throws Exception {
+    final String poolName = "pool1";
+    CachePoolInfo info = new CachePoolInfo(poolName).
+        setOwnerName("bob").setGroupName("bobgroup").
+        setMode(new FsPermission((short)0755)).setLimit(150l);
+
+    // Add a pool
+    dfs.addCachePool(info);
+
+    // Do some bad addCachePools
+    try {
+      dfs.addCachePool(info);
+      fail("added the pool with the same name twice");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("pool1 already exists", ioe);
+    }
+    try {
+      dfs.addCachePool(new CachePoolInfo(""));
+      fail("added empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      dfs.addCachePool(null);
+      fail("added null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+    }
+    try {
+      proto.addCachePool(new CachePoolInfo(""));
+      fail("added empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      proto.addCachePool(null);
+      fail("added null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+    }
+
+    // Modify the pool
+    info.setOwnerName("jane").setGroupName("janegroup")
+        .setMode(new FsPermission((short)0700)).setLimit(314l);
+    dfs.modifyCachePool(info);
+
+    // Do some invalid modify pools
+    try {
+      dfs.modifyCachePool(new CachePoolInfo("fool"));
+      fail("modified non-existent cache pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("fool does not exist", ioe);
+    }
+    try {
+      dfs.modifyCachePool(new CachePoolInfo(""));
+      fail("modified empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      dfs.modifyCachePool(null);
+      fail("modified null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+    }
+    try {
+      proto.modifyCachePool(new CachePoolInfo(""));
+      fail("modified empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      proto.modifyCachePool(null);
+      fail("modified null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+    }
+
+    // Remove the pool
+    dfs.removeCachePool(poolName);
+    // Do some bad removePools
+    try {
+      dfs.removeCachePool("pool99");
+      fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Cannot remove " +
+          "non-existent cache pool", ioe);
+    }
+    try {
+      dfs.removeCachePool(poolName);
+      fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Cannot remove " +
+          "non-existent cache pool", ioe);
+    }
+    try {
+      dfs.removeCachePool("");
+      fail("removed empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      dfs.removeCachePool(null);
+      fail("removed null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      proto.removeCachePool("");
+      fail("removed empty pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+    try {
+      proto.removeCachePool(null);
+      fail("removed null pool");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+          ioe);
+    }
+
+    info = new CachePoolInfo("pool2");
+    dfs.addCachePool(info);
+  }
+
+  @Test(timeout=60000)
+  public void testCreateAndModifyPools() throws Exception {
+    String poolName = "pool1";
+    String ownerName = "abc";
+    String groupName = "123";
+    FsPermission mode = new FsPermission((short)0755);
+    long limit = 150;
+    dfs.addCachePool(new CachePoolInfo(poolName).
+        setOwnerName(ownerName).setGroupName(groupName).
+        setMode(mode).setLimit(limit));
+    
+    RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
+    CachePoolInfo info = iter.next().getInfo();
+    assertEquals(poolName, info.getPoolName());
+    assertEquals(ownerName, info.getOwnerName());
+    assertEquals(groupName, info.getGroupName());
+
+    ownerName = "def";
+    groupName = "456";
+    mode = new FsPermission((short)0700);
+    limit = 151;
+    dfs.modifyCachePool(new CachePoolInfo(poolName).
+        setOwnerName(ownerName).setGroupName(groupName).
+        setMode(mode).setLimit(limit));
+
+    iter = dfs.listCachePools();
+    info = iter.next().getInfo();
+    assertEquals(poolName, info.getPoolName());
+    assertEquals(ownerName, info.getOwnerName());
+    assertEquals(groupName, info.getGroupName());
+    assertEquals(mode, info.getMode());
+    assertEquals(limit, (long)info.getLimit());
+
+    dfs.removeCachePool(poolName);
+    iter = dfs.listCachePools();
+    assertFalse("expected no cache pools after deleting pool", iter.hasNext());
+
+    proto.listCachePools(null);
+
+    try {
+      proto.removeCachePool("pool99");
+      fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Cannot remove non-existent",
+          ioe);
+    }
+    try {
+      proto.removeCachePool(poolName);
+      fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Cannot remove non-existent",
+          ioe);
+    }
+
+    iter = dfs.listCachePools();
+    assertFalse("expected no cache pools after deleting pool", iter.hasNext());
+  }
+
+  private static void validateListAll(
+      RemoteIterator<CacheDirectiveEntry> iter,
+      Long... ids) throws Exception {
+    for (Long id: ids) {
+      assertTrue("Unexpectedly few elements", iter.hasNext());
+      assertEquals("Unexpected directive ID", id,
+          iter.next().getInfo().getId());
+    }
+    assertFalse("Unexpectedly many list elements", iter.hasNext());
+  }
+
+  private static long addAsUnprivileged(
+      final CacheDirectiveInfo directive) throws Exception {
+    return unprivilegedUser
+        .doAs(new PrivilegedExceptionAction<Long>() {
+          @Override
+          public Long run() throws IOException {
+            DistributedFileSystem myDfs =
+                (DistributedFileSystem) FileSystem.get(conf);
+            return myDfs.addCacheDirective(directive);
+          }
+        });
+  }
+
+  @Test(timeout=60000)
+  public void testAddRemoveDirectives() throws Exception {
+    proto.addCachePool(new CachePoolInfo("pool1").
+        setMode(new FsPermission((short)0777)));
+    proto.addCachePool(new CachePoolInfo("pool2").
+        setMode(new FsPermission((short)0777)));
+    proto.addCachePool(new CachePoolInfo("pool3").
+        setMode(new FsPermission((short)0777)));
+    proto.addCachePool(new CachePoolInfo("pool4").
+        setMode(new FsPermission((short)0)));
+
+    CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/alpha")).
+        setPool("pool1").
+        build();
+    CacheDirectiveInfo beta = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/beta")).
+        setPool("pool2").
+        build();
+    CacheDirectiveInfo delta = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/delta")).
+        setPool("pool1").
+        build();
+
+    long alphaId = addAsUnprivileged(alpha);
+    long alphaId2 = addAsUnprivileged(alpha);
+    assertFalse("Expected to get unique directives when re-adding an "
+        + "existing CacheDirectiveInfo",
+        alphaId == alphaId2);
+    long betaId = addAsUnprivileged(beta);
+
+    try {
+      addAsUnprivileged(new CacheDirectiveInfo.Builder().
+          setPath(new Path("/unicorn")).
+          setPool("no_such_pool").
+          build());
+      fail("expected an error when adding to a non-existent pool.");
+    } catch (InvalidRequestException ioe) {
+      GenericTestUtils.assertExceptionContains("Unknown pool", ioe);
+    }
+
+    try {
+      addAsUnprivileged(new CacheDirectiveInfo.Builder().
+          setPath(new Path("/blackhole")).
+          setPool("pool4").
+          build());
+      fail("expected an error when adding to a pool with " +
+          "mode 0 (no permissions for anyone).");
+    } catch (AccessControlException e) {
+      GenericTestUtils.
+          assertExceptionContains("Permission denied while accessing pool", e);
+    }
+
+    try {
+      addAsUnprivileged(new CacheDirectiveInfo.Builder().
+          setPath(new Path("/illegal:path/")).
+          setPool("pool1").
+          build());
+      fail("expected an error when adding a malformed path " +
+          "to the cache directives.");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains("is not a valid DFS filename", e);
+    }
+
+    try {
+      addAsUnprivileged(new CacheDirectiveInfo.Builder().
+          setPath(new Path("/emptypoolname")).
+          setReplication((short)1).
+          setPool("").
+          build());
+      fail("expected an error when adding a cache " +
+          "directive with an empty pool name.");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("Invalid empty pool name", e);
+    }
+
+    long deltaId = addAsUnprivileged(delta);
+
+    // We expect the following to succeed, because DistributedFileSystem
+    // qualifies the path.
+    long relativeId = addAsUnprivileged(
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("relative")).
+            setPool("pool1").
+            build());
+
+    RemoteIterator<CacheDirectiveEntry> iter;
+    iter = dfs.listCacheDirectives(null);
+    validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId );
+    iter = dfs.listCacheDirectives(
+        new CacheDirectiveInfo.Builder().setPool("pool3").build());
+    assertFalse(iter.hasNext());
+    iter = dfs.listCacheDirectives(
+        new CacheDirectiveInfo.Builder().setPool("pool1").build());
+    validateListAll(iter, alphaId, alphaId2, deltaId, relativeId );
+    iter = dfs.listCacheDirectives(
+        new CacheDirectiveInfo.Builder().setPool("pool2").build());
+    validateListAll(iter, betaId);
+
+    dfs.removeCacheDirective(betaId);
+    iter = dfs.listCacheDirectives(
+        new CacheDirectiveInfo.Builder().setPool("pool2").build());
+    assertFalse(iter.hasNext());
+
+    try {
+      dfs.removeCacheDirective(betaId);
+      fail("expected an error when removing a non-existent ID");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("No directive with ID", e);
+    }
+
+    try {
+      proto.removeCacheDirective(-42l);
+      fail("expected an error when removing a negative ID");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Invalid negative ID", e);
+    }
+    try {
+      proto.removeCacheDirective(43l);
+      fail("expected an error when removing a non-existent ID");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("No directive with ID", e);
+    }
+
+    dfs.removeCacheDirective(alphaId);
+    dfs.removeCacheDirective(alphaId2);
+    dfs.removeCacheDirective(deltaId);
+
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().
+        setId(relativeId).
+        setReplication((short)555).
+        build());
+    iter = dfs.listCacheDirectives(null);
+    assertTrue(iter.hasNext());
+    CacheDirectiveInfo modified = iter.next().getInfo();
+    assertEquals(relativeId, modified.getId().longValue());
+    assertEquals((short)555, modified.getReplication().shortValue());
+    dfs.removeCacheDirective(relativeId);
+    iter = dfs.listCacheDirectives(null);
+    assertFalse(iter.hasNext());
+
+    // Verify that PBCDs with path "." work correctly
+    CacheDirectiveInfo directive =
+        new CacheDirectiveInfo.Builder().setPath(new Path("."))
+            .setPool("pool1").build();
+    long id = dfs.addCacheDirective(directive);
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(
+        directive).setId(id).setReplication((short)2).build());
+    dfs.removeCacheDirective(id);
+  }
+
+  @Test(timeout=60000)
+  public void testCacheManagerRestart() throws Exception {
+    // Create and validate a pool
+    final String pool = "poolparty";
+    String groupName = "partygroup";
+    FsPermission mode = new FsPermission((short)0777);
+    long limit = 747;
+    dfs.addCachePool(new CachePoolInfo(pool)
+        .setGroupName(groupName)
+        .setMode(mode)
+        .setLimit(limit));
+    RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
+    assertTrue("No cache pools found", pit.hasNext());
+    CachePoolInfo info = pit.next().getInfo();
+    assertEquals(pool, info.getPoolName());
+    assertEquals(groupName, info.getGroupName());
+    assertEquals(mode, info.getMode());
+    assertEquals(limit, (long)info.getLimit());
+    assertFalse("Unexpected # of cache pools found", pit.hasNext());
+  
+    // Create some cache entries
+    int numEntries = 10;
+    String entryPrefix = "/party-";
+    long prevId = -1;
+    final Date expiry = new Date();
+    for (int i=0; i<numEntries; i++) {
+      prevId = dfs.addCacheDirective(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path(entryPrefix + i)).setPool(pool).
+            setExpiration(
+                CacheDirectiveInfo.Expiration.newAbsolute(expiry.getTime())).
+            build());
+    }
+    RemoteIterator<CacheDirectiveEntry> dit
+        = dfs.listCacheDirectives(null);
+    for (int i=0; i<numEntries; i++) {
+      assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+      CacheDirectiveInfo cd = dit.next().getInfo();
+      assertEquals(i+1, cd.getId().longValue());
+      assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+      assertEquals(pool, cd.getPool());
+    }
+    assertFalse("Unexpected # of cache directives found", dit.hasNext());
+  
+    // Restart namenode
+    cluster.restartNameNode();
+  
+    // Check that state came back up
+    pit = dfs.listCachePools();
+    assertTrue("No cache pools found", pit.hasNext());
+    info = pit.next().getInfo();
+    assertEquals(pool, info.getPoolName());
+    assertEquals(pool, info.getPoolName());
+    assertEquals(groupName, info.getGroupName());
+    assertEquals(mode, info.getMode());
+    assertEquals(limit, (long)info.getLimit());
+    assertFalse("Unexpected # of cache pools found", pit.hasNext());
+  
+    dit = dfs.listCacheDirectives(null);
+    for (int i=0; i<numEntries; i++) {
+      assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+      CacheDirectiveInfo cd = dit.next().getInfo();
+      assertEquals(i+1, cd.getId().longValue());
+      assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+      assertEquals(pool, cd.getPool());
+      assertEquals(expiry.getTime(), cd.getExpiration().getMillis());
+    }
+    assertFalse("Unexpected # of cache directives found", dit.hasNext());
+
+    long nextId = dfs.addCacheDirective(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foobar")).setPool(pool).build());
+    assertEquals(prevId + 1, nextId);
+  }
+
+  /**
+   * Wait for the NameNode to have an expected number of cached blocks
+   * and replicas.
+   * @param nn NameNode
+   * @param expectedCachedBlocks if -1, treat as wildcard
+   * @param expectedCachedReplicas if -1, treat as wildcard
+   * @throws Exception
+   */
+  private static void waitForCachedBlocks(NameNode nn,
+      final int expectedCachedBlocks, final int expectedCachedReplicas,
+      final String logString) throws Exception {
+    final FSNamesystem namesystem = nn.getNamesystem();
+    final CacheManager cacheManager = namesystem.getCacheManager();
+    LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
+             expectedCachedReplicas + " replicas.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        int numCachedBlocks = 0, numCachedReplicas = 0;
+        namesystem.readLock();
+        try {
+          GSet<CachedBlock, CachedBlock> cachedBlocks =
+              cacheManager.getCachedBlocks();
+          if (cachedBlocks != null) {
+            for (Iterator<CachedBlock> iter = cachedBlocks.iterator();
+                iter.hasNext(); ) {
+              CachedBlock cachedBlock = iter.next();
+              numCachedBlocks++;
+              numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size();
+            }
+          }
+        } finally {
+          namesystem.readUnlock();
+        }
+        if (expectedCachedBlocks == -1 ||
+            numCachedBlocks == expectedCachedBlocks) {
+          if (expectedCachedReplicas == -1 ||
+              numCachedReplicas == expectedCachedReplicas) {
+            return true;
+          }
+        }
+        LOG.info(logString + " cached blocks: have " + numCachedBlocks +
+            " / " + expectedCachedBlocks + ".  " +
+            "cached replicas: have " + numCachedReplicas +
+            " / " + expectedCachedReplicas);
+        return false;
+      }
+    }, 500, 60000);
+  }
+
+  private static void waitForCacheDirectiveStats(final DistributedFileSystem dfs,
+      final long targetBytesNeeded, final long targetBytesCached,
+      final long targetFilesNeeded, final long targetFilesCached,
+      final CacheDirectiveInfo filter, final String infoString)
+            throws Exception {
+    LOG.info("Polling listCacheDirectives " + 
+        ((filter == null) ? "ALL" : filter.toString()) + " for " +
+        targetBytesNeeded + " targetBytesNeeded, " +
+        targetBytesCached + " targetBytesCached, " +
+        targetFilesNeeded + " targetFilesNeeded, " +
+        targetFilesCached + " targetFilesCached");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        RemoteIterator<CacheDirectiveEntry> iter = null;
+        CacheDirectiveEntry entry = null;
+        try {
+          iter = dfs.listCacheDirectives(filter);
+          entry = iter.next();
+        } catch (IOException e) {
+          fail("got IOException while calling " +
+              "listCacheDirectives: " + e.getMessage());
+        }
+        Assert.assertNotNull(entry);
+        CacheDirectiveStats stats = entry.getStats();
+        if ((targetBytesNeeded == stats.getBytesNeeded()) &&
+            (targetBytesCached == stats.getBytesCached()) &&
+            (targetFilesNeeded == stats.getFilesNeeded()) &&
+            (targetFilesCached == stats.getFilesCached())) {
+          return true;
+        } else {
+          LOG.info(infoString + ": " +
+              "filesNeeded: " +
+              stats.getFilesNeeded() + "/" + targetFilesNeeded +
+              ", filesCached: " + 
+              stats.getFilesCached() + "/" + targetFilesCached +
+              ", bytesNeeded: " +
+              stats.getBytesNeeded() + "/" + targetBytesNeeded +
+              ", bytesCached: " + 
+              stats.getBytesCached() + "/" + targetBytesCached);
+          return false;
+        }
+      }
+    }, 500, 60000);
+  }
+
+  private static void waitForCachePoolStats(final DistributedFileSystem dfs,
+      final long targetBytesNeeded, final long targetBytesCached,
+      final long targetFilesNeeded, final long targetFilesCached,
+      final CachePoolInfo pool, final String infoString)
+            throws Exception {
+    LOG.info("Polling listCachePools " + pool.toString() + " for " +
+        targetBytesNeeded + " targetBytesNeeded, " +
+        targetBytesCached + " targetBytesCached, " +
+        targetFilesNeeded + " targetFilesNeeded, " +
+        targetFilesCached + " targetFilesCached");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        RemoteIterator<CachePoolEntry> iter = null;
+        try {
+          iter = dfs.listCachePools();
+        } catch (IOException e) {
+          fail("got IOException while calling " +
+              "listCachePools: " + e.getMessage());
+        }
+        while (true) {
+          CachePoolEntry entry = null;
+          try {
+            if (!iter.hasNext()) {
+              break;
+            }
+            entry = iter.next();
+          } catch (IOException e) {
+            fail("got IOException while iterating through " +
+                "listCachePools: " + e.getMessage());
+          }
+          if (entry == null) {
+            break;
+          }
+          if (!entry.getInfo().getPoolName().equals(pool.getPoolName())) {
+            continue;
+          }
+          CachePoolStats stats = entry.getStats();
+          if ((targetBytesNeeded == stats.getBytesNeeded()) &&
+              (targetBytesCached == stats.getBytesCached()) &&
+              (targetFilesNeeded == stats.getFilesNeeded()) &&
+              (targetFilesCached == stats.getFilesCached())) {
+            return true;
+          } else {
+            LOG.info(infoString + ": " +
+                "filesNeeded: " +
+                stats.getFilesNeeded() + "/" + targetFilesNeeded +
+                ", filesCached: " + 
+                stats.getFilesCached() + "/" + targetFilesCached +
+                ", bytesNeeded: " +
+                stats.getBytesNeeded() + "/" + targetBytesNeeded +
+                ", bytesCached: " + 
+                stats.getBytesCached() + "/" + targetBytesCached);
+            return false;
+          }
+        }
+        return false;
+      }
+    }, 500, 60000);
+  }
+
+  private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
+      final List<Path> paths, final int expectedBlocks,
+      final int expectedReplicas)
+      throws Exception {
+    int numCachedBlocks = 0;
+    int numCachedReplicas = 0;
+    for (Path p: paths) {
+      final FileStatus f = dfs.getFileStatus(p);
+      final long len = f.getLen();
+      final long blockSize = f.getBlockSize();
+      // round it up to full blocks
+      final long numBlocks = (len + blockSize - 1) / blockSize;
+      BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len);
+      assertEquals("Unexpected number of block locations for path " + p,
+          numBlocks, locs.length);
+      for (BlockLocation l: locs) {
+        if (l.getCachedHosts().length > 0) {
+          numCachedBlocks++;
+        }
+        numCachedReplicas += l.getCachedHosts().length;
+      }
+    }
+    LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks");
+    LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas
+        + " replicas");
+    assertEquals("Unexpected number of cached blocks", expectedBlocks,
+        numCachedBlocks);
+    assertEquals("Unexpected number of cached replicas", expectedReplicas,
+        numCachedReplicas);
+  }
+
+  @Test(timeout=120000)
+  public void testWaitForCachedReplicas() throws Exception {
+    FileSystemTestHelper helper = new FileSystemTestHelper();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return ((namenode.getNamesystem().getCacheCapacity() ==
+            (NUM_DATANODES * CACHE_CAPACITY)) &&
+              (namenode.getNamesystem().getCacheUsed() == 0));
+      }
+    }, 500, 60000);
+
+    // Send a cache report referring to a bogus block.  It is important that
+    // the NameNode be robust against this.
+    NamenodeProtocols nnRpc = namenode.getRpcServer();
+    DataNode dn0 = cluster.getDataNodes().get(0);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    LinkedList<Long> bogusBlockIds = new LinkedList<Long> ();
+    bogusBlockIds.add(999999L);
+    nnRpc.cacheReport(dn0.getDNRegistrationForBP(bpid), bpid, bogusBlockIds);
+
+    Path rootDir = helper.getDefaultWorkingDirectory(dfs);
+    // Create the pool
+    final String pool = "friendlyPool";
+    nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+    // Create some test files
+    final int numFiles = 2;
+    final int numBlocksPerFile = 2;
+    final List<String> paths = new ArrayList<String>(numFiles);
+    for (int i=0; i<numFiles; i++) {
+      Path p = new Path(rootDir, "testCachePaths-" + i);
+      FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+          (int)BLOCK_SIZE);
+      paths.add(p.toUri().getPath());
+    }
+    // Check the initial statistics at the namenode
+    waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
+    // Cache and check each path in sequence
+    int expected = 0;
+    for (int i=0; i<numFiles; i++) {
+      CacheDirectiveInfo directive =
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path(paths.get(i))).
+            setPool(pool).
+            build();
+      nnRpc.addCacheDirective(directive, EnumSet.noneOf(CacheFlag.class));
+      expected += numBlocksPerFile;
+      waitForCachedBlocks(namenode, expected, expected,
+          "testWaitForCachedReplicas:1");
+    }
+
+    // Check that the datanodes have the right cache values
+    DatanodeInfo[] live = dfs.getDataNodeStats(DatanodeReportType.LIVE);
+    assertEquals("Unexpected number of live nodes", NUM_DATANODES, live.length);
+    long totalUsed = 0;
+    for (DatanodeInfo dn : live) {
+      final long cacheCapacity = dn.getCacheCapacity();
+      final long cacheUsed = dn.getCacheUsed();
+      final long cacheRemaining = dn.getCacheRemaining();
+      assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
+      assertEquals("Capacity not equal to used + remaining",
+          cacheCapacity, cacheUsed + cacheRemaining);
+      assertEquals("Remaining not equal to capacity - used",
+          cacheCapacity - cacheUsed, cacheRemaining);
+      totalUsed += cacheUsed;
+    }
+    assertEquals(expected*BLOCK_SIZE, totalUsed);
+
+    // Uncache and check each path in sequence
+    RemoteIterator<CacheDirectiveEntry> entries =
+      new CacheDirectiveIterator(nnRpc, null);
+    for (int i=0; i<numFiles; i++) {
+      CacheDirectiveEntry entry = entries.next();
+      nnRpc.removeCacheDirective(entry.getInfo().getId());
+      expected -= numBlocksPerFile;
+      waitForCachedBlocks(namenode, expected, expected,
+          "testWaitForCachedReplicas:2");
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testWaitForCachedReplicasInDirectory() throws Exception {
+    // Create the pool
+    final String pool = "friendlyPool";
+    final CachePoolInfo poolInfo = new CachePoolInfo(pool);
+    dfs.addCachePool(poolInfo);
+    // Create some test files
+    final List<Path> paths = new LinkedList<Path>();
+    paths.add(new Path("/foo/bar"));
+    paths.add(new Path("/foo/baz"));
+    paths.add(new Path("/foo2/bar2"));
+    paths.add(new Path("/foo2/baz2"));
+    dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+    dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+    final int numBlocksPerFile = 2;
+    for (Path path : paths) {
+      FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+          (int)BLOCK_SIZE, (short)3, false);
+    }
+    waitForCachedBlocks(namenode, 0, 0,
+        "testWaitForCachedReplicasInDirectory:0");
+
+    // cache entire directory
+    long id = dfs.addCacheDirective(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo")).
+            setReplication((short)2).
+            setPool(pool).
+            build());
+    waitForCachedBlocks(namenode, 4, 8,
+        "testWaitForCachedReplicasInDirectory:1:blocks");
+    // Verify that listDirectives gives the stats we want.
+    waitForCacheDirectiveStats(dfs,
+        4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+        2, 2,
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo")).
+            build(),
+        "testWaitForCachedReplicasInDirectory:1:directive");
+    waitForCachePoolStats(dfs,
+        4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+        2, 2,
+        poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
+
+    long id2 = dfs.addCacheDirective(
+          new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo/bar")).
+            setReplication((short)4).
+            setPool(pool).
+            build());
+    // wait for an additional 2 cached replicas to come up
+    waitForCachedBlocks(namenode, 4, 10,
+        "testWaitForCachedReplicasInDirectory:2:blocks");
+    // the directory directive's stats are unchanged
+    waitForCacheDirectiveStats(dfs,
+        4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+        2, 2,
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo")).
+            build(),
+        "testWaitForCachedReplicasInDirectory:2:directive-1");
+    // verify /foo/bar's stats
+    waitForCacheDirectiveStats(dfs,
+        4 * numBlocksPerFile * BLOCK_SIZE,
+        // only 3 because the file only has 3 replicas, not 4 as requested.
+        3 * numBlocksPerFile * BLOCK_SIZE,
+        1,
+        // only 0 because the file can't be fully cached
+        0,
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("/foo/bar")).
+            build(),
+        "testWaitForCachedReplicasInDirectory:2:directive-2");
+    waitForCachePoolStats(dfs,
+        (4+4) * numBlocksPerFile * BLOCK_SIZE,
+        (4+3) * numBlocksPerFile * BLOCK_SIZE,
+        3, 2,
+        poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
+    // remove and watch numCached go to 0
+    dfs.removeCacheDirective(id);
+    dfs.removeCacheDirective(id2);
+    waitForCachedBlocks(namenode, 0, 0,
+        "testWaitForCachedReplicasInDirectory:3:blocks");
+    waitForCachePoolStats(dfs,
+        0, 0,
+        0, 0,
+        poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
+  }
+
+  /**
+   * Tests stepping the cache replication factor up and down, checking the
+   * number of cached replicas and blocks as well as the advertised locations.
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testReplicationFactor() throws Exception {
+    // Create the pool
+    final String pool = "friendlyPool";
+    dfs.addCachePool(new CachePoolInfo(pool));
+    // Create some test files
+    final List<Path> paths = new LinkedList<Path>();
+    paths.add(new Path("/foo/bar"));
+    paths.add(new Path("/foo/baz"));
+    paths.add(new Path("/foo2/bar2"));
+    paths.add(new Path("/foo2/baz2"));
+    dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+    dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+    final int numBlocksPerFile = 2;
+    for (Path path : paths) {
+      FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+          (int)BLOCK_SIZE, (short)3, false);
+    }
+    waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
+    checkNumCachedReplicas(dfs, paths, 0, 0);
+    // cache directory
+    long id = dfs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().
+          setPath(new Path("/foo")).
+          setReplication((short)1).
+          setPool(pool).
+          build());
+    waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
+    checkNumCachedReplicas(dfs, paths, 4, 4);
+    // step up the replication factor
+    for (int i=2; i<=3; i++) {
+      dfs.modifyCacheDirective(
+          new CacheDirectiveInfo.Builder().
+          setId(id).
+          setReplication((short)i).
+          build());
+      waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
+      checkNumCachedReplicas(dfs, paths, 4, 4*i);
+    }
+    // step it down
+    for (int i=2; i>=1; i--) {
+      dfs.modifyCacheDirective(
+          new CacheDirectiveInfo.Builder().
+          setId(id).
+          setReplication((short)i).
+          build());
+      waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
+      checkNumCachedReplicas(dfs, paths, 4, 4*i);
+    }
+    // remove and watch numCached go to 0
+    dfs.removeCacheDirective(id);
+    waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
+    checkNumCachedReplicas(dfs, paths, 0, 0);
+  }
+
+  @Test(timeout=60000)
+  public void testListCachePoolPermissions() throws Exception {
+    final UserGroupInformation myUser = UserGroupInformation
+        .createRemoteUser("myuser");
+    final DistributedFileSystem myDfs = 
+        (DistributedFileSystem)DFSTestUtil.getFileSystemAs(myUser, conf);
+    final String poolName = "poolparty";
+    dfs.addCachePool(new CachePoolInfo(poolName)
+        .setMode(new FsPermission((short)0700)));
+    // Should only see partial info
+    RemoteIterator<CachePoolEntry> it = myDfs.listCachePools();
+    CachePoolInfo info = it.next().getInfo();
+    assertFalse(it.hasNext());
+    assertEquals("Expected pool name", poolName, info.getPoolName());
+    assertNull("Unexpected owner name", info.getOwnerName());
+    assertNull("Unexpected group name", info.getGroupName());
+    assertNull("Unexpected mode", info.getMode());
+    assertNull("Unexpected limit", info.getLimit());
+    // Modify the pool so myuser is now the owner
+    final long limit = 99;
+    dfs.modifyCachePool(new CachePoolInfo(poolName)
+        .setOwnerName(myUser.getShortUserName())
+        .setLimit(limit));
+    // Should see full info
+    it = myDfs.listCachePools();
+    info = it.next().getInfo();
+    assertFalse(it.hasNext());
+    assertEquals("Expected pool name", poolName, info.getPoolName());
+    assertEquals("Mismatched owner name", myUser.getShortUserName(),
+        info.getOwnerName());
+    assertNotNull("Expected group name", info.getGroupName());
+    assertEquals("Mismatched mode", (short) 0700,
+        info.getMode().toShort());
+    assertEquals("Mismatched limit", limit, (long)info.getLimit());
+  }
+
+  @Test(timeout=120000)
+  public void testExpiry() throws Exception {
+    String pool = "pool1";
+    dfs.addCachePool(new CachePoolInfo(pool));
+    Path p = new Path("/mypath");
+    DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
+    // Expire after test timeout
+    Date start = new Date();
+    Date expiry = DateUtils.addSeconds(start, 120);
+    final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+        .setPath(p)
+        .setPool(pool)
+        .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
+        .setReplication((short)2)
+        .build());
+    waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
+    // Change it to expire sooner
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+        .setExpiration(Expiration.newRelative(0)).build());
+    waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
+    RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
+    CacheDirectiveEntry ent = it.next();
+    assertFalse(it.hasNext());
+    Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+    assertTrue("Directive should have expired",
+        entryExpiry.before(new Date()));
+    // Change it back to expire later
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+        .setExpiration(Expiration.newRelative(120000)).build());
+    waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
+    it = dfs.listCacheDirectives(null);
+    ent = it.next();
+    assertFalse(it.hasNext());
+    entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+    assertTrue("Directive should not have expired",
+        entryExpiry.after(new Date()));
+    // Verify that setting a negative TTL throws an error
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+          .setExpiration(Expiration.newRelative(-1)).build());
+    } catch (InvalidRequestException e) {
+      GenericTestUtils
+          .assertExceptionContains("Cannot set a negative expiration", e);
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testLimit() throws Exception {
+    try {
+      dfs.addCachePool(new CachePoolInfo("poolofnegativity").setLimit(-99l));
+      fail("Should not be able to set a negative limit");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("negative", e);
+    }
+    final String destiny = "poolofdestiny";
+    final Path path1 = new Path("/destiny");
+    DFSTestUtil.createFile(dfs, path1, 2*BLOCK_SIZE, (short)1, 0x9494);
+    // Start off with a limit that is too small
+    final CachePoolInfo poolInfo = new CachePoolInfo(destiny)
+        .setLimit(2*BLOCK_SIZE-1);
+    dfs.addCachePool(poolInfo);
+    final CacheDirectiveInfo info1 = new CacheDirectiveInfo.Builder()
+        .setPool(destiny).setPath(path1).build();
+    try {
+      dfs.addCacheDirective(info1);
+      fail("Should not be able to cache when there is no more limit");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("remaining capacity", e);
+    }
+    // Raise the limit up to fit and it should work this time
+    poolInfo.setLimit(2*BLOCK_SIZE);
+    dfs.modifyCachePool(poolInfo);
+    long id1 = dfs.addCacheDirective(info1);
+    waitForCachePoolStats(dfs,
+        2*BLOCK_SIZE, 2*BLOCK_SIZE,
+        1, 1,
+        poolInfo, "testLimit:1");
+    // Adding another file, it shouldn't be cached
+    final Path path2 = new Path("/failure");
+    DFSTestUtil.createFile(dfs, path2, BLOCK_SIZE, (short)1, 0x9495);
+    try {
+      dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+          .setPool(destiny).setPath(path2).build(),
+          EnumSet.noneOf(CacheFlag.class));
+      fail("Should not be able to add another cached file");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("remaining capacity", e);
+    }
+    // Bring the limit down, the first file should get uncached
+    poolInfo.setLimit(BLOCK_SIZE);
+    dfs.modifyCachePool(poolInfo);
+    waitForCachePoolStats(dfs,
+        2*BLOCK_SIZE, 0,
+        1, 0,
+        poolInfo, "testLimit:2");
+    RemoteIterator<CachePoolEntry> it = dfs.listCachePools();
+    assertTrue("Expected a cache pool", it.hasNext());
+    CachePoolStats stats = it.next().getStats();
+    assertEquals("Overlimit bytes should be difference of needed and limit",
+        BLOCK_SIZE, stats.getBytesOverlimit());
+    // Moving a directive to a pool without enough limit should fail
+    CachePoolInfo inadequate =
+        new CachePoolInfo("poolofinadequacy").setLimit(BLOCK_SIZE);
+    dfs.addCachePool(inadequate);
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1)
+          .setId(id1).setPool(inadequate.getPoolName()).build(),
+          EnumSet.noneOf(CacheFlag.class));
+    } catch(InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("remaining capacity", e);
+    }
+    // Succeeds when force=true
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1).setId(id1)
+        .setPool(inadequate.getPoolName()).build(),
+        EnumSet.of(CacheFlag.FORCE));
+    // Also can add with force=true
+    dfs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().setPool(inadequate.getPoolName())
+            .setPath(path1).build(), EnumSet.of(CacheFlag.FORCE));
+  }
+
+  @Test(timeout=30000)
+  public void testMaxRelativeExpiry() throws Exception {
+    // Test that negative and really big max expirations can't be set during add
+    try {
+      dfs.addCachePool(new CachePoolInfo("failpool").setMaxRelativeExpiryMs(-1l));
+      fail("Added a pool with a negative max expiry.");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("negative", e);
+    }
+    try {
+      dfs.addCachePool(new CachePoolInfo("failpool")
+          .setMaxRelativeExpiryMs(Long.MAX_VALUE - 1));
+      fail("Added a pool with too big of a max expiry.");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("too big", e);
+    }
+    // Test that setting a max relative expiry on a pool works
+    CachePoolInfo coolPool = new CachePoolInfo("coolPool");
+    final long poolExpiration = 1000 * 60 * 10l;
+    dfs.addCachePool(coolPool.setMaxRelativeExpiryMs(poolExpiration));
+    RemoteIterator<CachePoolEntry> poolIt = dfs.listCachePools();
+    CachePoolInfo listPool = poolIt.next().getInfo();
+    assertFalse("Should only be one pool", poolIt.hasNext());
+    assertEquals("Expected max relative expiry to match set value",
+        poolExpiration, listPool.getMaxRelativeExpiryMs().longValue());
+    // Test that negative and really big max expirations can't be modified
+    try {
+      dfs.addCachePool(coolPool.setMaxRelativeExpiryMs(-1l));
+      fail("Added a pool with a negative max expiry.");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("negative", e);
+    }
+    try {
+      dfs.modifyCachePool(coolPool
+          .setMaxRelativeExpiryMs(CachePoolInfo.RELATIVE_EXPIRY_NEVER+1));
+      fail("Added a pool with too big of a max expiry.");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("too big", e);
+    }
+    // Test that adding a directives without an expiration uses the pool's max
+    CacheDirectiveInfo defaultExpiry = new CacheDirectiveInfo.Builder()
+        .setPath(new Path("/blah"))
+        .setPool(coolPool.getPoolName())
+        .build();
+    dfs.addCacheDirective(defaultExpiry);
+    RemoteIterator<CacheDirectiveEntry> dirIt =
+        dfs.listCacheDirectives(defaultExpiry);
+    CacheDirectiveInfo listInfo = dirIt.next().getInfo();
+    assertFalse("Should only have one entry in listing", dirIt.hasNext());
+    long listExpiration = listInfo.getExpiration().getAbsoluteMillis()
+        - new Date().getTime();
+    assertTrue("Directive expiry should be approximately the pool's max expiry",
+        Math.abs(listExpiration - poolExpiration) < 10*1000);
+    // Test that the max is enforced on add for relative and absolute
+    CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder()
+        .setPath(new Path("/lolcat"))
+        .setPool(coolPool.getPoolName());
+    try {
+      dfs.addCacheDirective(builder
+          .setExpiration(Expiration.newRelative(poolExpiration+1))
+          .build());
+      fail("Added a directive that exceeds pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    try {
+      dfs.addCacheDirective(builder
+          .setExpiration(Expiration.newAbsolute(
+              new Date().getTime() + poolExpiration + (10*1000)))
+          .build());
+      fail("Added a directive that exceeds pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    // Test that max is enforced on modify for relative and absolute Expirations
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setExpiration(Expiration.newRelative(poolExpiration+1))
+          .build());
+      fail("Modified a directive to exceed pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setExpiration(Expiration.newAbsolute(
+              new Date().getTime() + poolExpiration + (10*1000)))
+          .build());
+      fail("Modified a directive to exceed pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    // Test some giant limit values with add
+    try {
+      dfs.addCacheDirective(builder
+          .setExpiration(Expiration.newRelative(
+              Long.MAX_VALUE))
+          .build());
+      fail("Added a directive with a gigantic max value");
+    } catch (IllegalArgumentException e) {
+      assertExceptionContains("is too far in the future", e);
+    }
+    try {
+      dfs.addCacheDirective(builder
+          .setExpiration(Expiration.newAbsolute(
+              Long.MAX_VALUE))
+          .build());
+      fail("Added a directive with a gigantic max value");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("is too far in the future", e);
+    }
+    // Test some giant limit values with modify
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setExpiration(Expiration.NEVER)
+          .build());
+      fail("Modified a directive to exceed pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setExpiration(Expiration.newAbsolute(
+              Long.MAX_VALUE))
+          .build());
+      fail("Modified a directive to exceed pool's max relative expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("is too far in the future", e);
+    }
+    // Test that the max is enforced on modify correctly when changing pools
+    CachePoolInfo destPool = new CachePoolInfo("destPool");
+    dfs.addCachePool(destPool.setMaxRelativeExpiryMs(poolExpiration / 2));
+    try {
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+          .setId(listInfo.getId())
+          .setPool(destPool.getPoolName())
+          .build());
+      fail("Modified a directive to a pool with a lower max expiration");
+    } catch (InvalidRequestException e) {
+      assertExceptionContains("exceeds the max relative expiration", e);
+    }
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
+        .setId(listInfo.getId())
+        .setPool(destPool.getPoolName())
+        .setExpiration(Expiration.newRelative(poolExpiration / 2))
+        .build());
+    dirIt = dfs.listCacheDirectives(new CacheDirectiveInfo.Builder()
+        .setPool(destPool.getPoolName())
+        .build());
+    listInfo = dirIt.next().getInfo();
+    listExpiration = listInfo.getExpiration().getAbsoluteMillis()
+        - new Date().getTime();
+    assertTrue("Unexpected relative expiry " + listExpiration
+        + " expected approximately " + poolExpiration/2,
+        Math.abs(poolExpiration/2 - listExpiration) < 10*1000);
+    // Test that cache pool and directive expiry can be modified back to never
+    dfs.modifyCachePool(destPool
+        .setMaxRelativeExpiryMs(CachePoolInfo.RELATIVE_EXPIRY_NEVER));
+    poolIt = dfs.listCachePools();
+    listPool = poolIt.next().getInfo();
+    while (!listPool.getPoolName().equals(destPool.getPoolName())) {
+      listPool = poolIt.next().getInfo();
+    }
+    assertEquals("Expected max relative expiry to match set value",
+        CachePoolInfo.RELATIVE_EXPIRY_NEVER,
+        listPool.getMaxRelativeExpiryMs().longValue());
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder()
+        .setId(listInfo.getId())
+        .setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER))
+        .build());
+    // Test modifying close to the limit
+    dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder()
+        .setId(listInfo.getId())
+        .setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER - 1))
+        .build());
+  }
+
+  @Test(timeout=60000)
+  public void testExceedsCapacity() throws Exception {
+    // Create a giant file
+    final Path fileName = new Path("/exceeds");
+    final long fileLen = CACHE_CAPACITY * (NUM_DATANODES*2);
+    int numCachedReplicas = (int) ((CACHE_CAPACITY*NUM_DATANODES)/BLOCK_SIZE);
+    DFSTestUtil.createFile(dfs, fileName, fileLen, (short) NUM_DATANODES,
+        0xFADED);
+    // Set up a log appender watcher
+    final LogVerificationAppender appender = new LogVerificationAppender();
+    final Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool")
+        .setPath(fileName).setReplication((short) 1).build());
+    waitForCachedBlocks(namenode, -1, numCachedReplicas,
+        "testExceeds:1");
+    // Check that no DNs saw an excess CACHE message
+    int lines = appender.countLinesWithMessage(
+        "more bytes in the cache: " +
+        DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+    assertEquals("Namenode should not send extra CACHE commands", 0, lines);
+    // Try creating a file with giant-sized blocks that exceed cache capacity
+    dfs.delete(fileName, false);
+    DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2,
+        (short) 1, 0xFADED);
+    // Nothing will get cached, so just force sleep for a bit
+    Thread.sleep(4000);
+    // Still should not see any excess commands
+    lines = appender.countLinesWithMessage(
+        "more bytes in the cache: " +
+        DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+    assertEquals("Namenode should not send extra CACHE commands", 0, lines);
+  }
+}

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java Wed Jan 22 21:43:00 2014
@@ -143,7 +143,8 @@ public class TestDeadDatanode {
     StorageReport[] rep = { new StorageReport(
         new DatanodeStorage(reg.getDatanodeUuid()),
         false, 0, 0, 0, 0) };
-    DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0).getCommands();
+    DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0)
+      .getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java Wed Jan 22 21:43:00 2014
@@ -31,7 +31,10 @@ import javax.management.ObjectName;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
 import org.apache.hadoop.util.VersionInfo;
 import org.junit.Test;
 import org.mortbay.util.ajax.JSON;
@@ -46,10 +49,16 @@ public class TestNameNodeMXBean {
    */
   private static final double DELTA = 0.000001;
 
+  static {
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+  }
+
   @SuppressWarnings({ "unchecked" })
   @Test
   public void testNameNodeMXBeanInfo() throws Exception {
     Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+      NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
     MiniDFSCluster cluster = null;
 
     try {
@@ -171,6 +180,10 @@ public class TestNameNodeMXBean {
       }
       assertEquals(1, statusMap.get("active").size());
       assertEquals(1, statusMap.get("failed").size());
+      assertEquals(0L, mbs.getAttribute(mxbeanName, "CacheUsed"));
+      assertEquals(NativeIO.POSIX.getCacheManipulator().getMemlockLimit() * 
+          cluster.getDataNodes().size(),
+              mbs.getAttribute(mxbeanName, "CacheCapacity"));
     } finally {
       if (cluster != null) {
         for (URI dir : cluster.getNameDirs(0)) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java Wed Jan 22 21:43:00 2014
@@ -414,7 +414,7 @@ public class TestNamenodeRetryCache {
     
     LightWeightCache<CacheEntry, CacheEntry> cacheSet = 
         (LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
-    assertEquals(14, cacheSet.size());
+    assertEquals(20, cacheSet.size());
     
     Map<CacheEntry, CacheEntry> oldEntries = 
         new HashMap<CacheEntry, CacheEntry>();
@@ -433,7 +433,7 @@ public class TestNamenodeRetryCache {
     assertTrue(namesystem.hasRetryCache());
     cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
         .getRetryCache().getCacheSet();
-    assertEquals(14, cacheSet.size());
+    assertEquals(20, cacheSet.size());
     iter = cacheSet.iterator();
     while (iter.hasNext()) {
       CacheEntry entry = iter.next();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java Wed Jan 22 21:43:00 2014
@@ -27,6 +27,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.LinkedList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -59,6 +60,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 /**
  * Tests state transition from active->standby, and manual failover
  * and failback between two namenodes.
@@ -124,6 +127,17 @@ public class TestHAStateTransitions {
     }
   }
 
+  private void addCrmThreads(MiniDFSCluster cluster,
+      LinkedList<Thread> crmThreads) {
+    for (int nn = 0; nn <= 1; nn++) {
+      Thread thread = cluster.getNameNode(nn).getNamesystem().
+          getCacheManager().getCacheReplicationMonitor();
+      if (thread != null) {
+        crmThreads.add(thread);
+      }
+    }
+  }
+
   /**
    * Test that transitioning a service to the state that it is already
    * in is a nop, specifically, an exception is not thrown.
@@ -131,19 +145,30 @@ public class TestHAStateTransitions {
   @Test
   public void testTransitionToCurrentStateIsANop() throws Exception {
     Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1L);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .nnTopology(MiniDFSNNTopology.simpleHATopology())
       .numDataNodes(1)
       .build();
+    LinkedList<Thread> crmThreads = new LinkedList<Thread>();
     try {
       cluster.waitActive();
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToActive(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToActive(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToStandby(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToStandby(0);
+      addCrmThreads(cluster, crmThreads);
     } finally {
       cluster.shutdown();
     }
+    // Verify that all cacheReplicationMonitor threads shut down
+    for (Thread thread : crmThreads) {
+      Uninterruptibles.joinUninterruptibly(thread);
+    }
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java Wed Jan 22 21:43:00 2014
@@ -29,6 +29,7 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
@@ -37,11 +38,13 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -53,12 +56,16 @@ import org.apache.hadoop.hdfs.MiniDFSNNT
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -82,6 +89,7 @@ public class TestRetryCacheWithHA {
   private static final int BlockSize = 1024;
   private static final short DataNodes = 3;
   private static final int CHECKTIMES = 10;
+  private static final int ResponseSize = 3;
   
   private MiniDFSCluster cluster;
   private DistributedFileSystem dfs;
@@ -116,6 +124,8 @@ public class TestRetryCacheWithHA {
   @Before
   public void setup() throws Exception {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, ResponseSize);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, ResponseSize);
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
         .numDataNodes(DataNodes).build();
@@ -147,7 +157,7 @@ public class TestRetryCacheWithHA {
     FSNamesystem fsn0 = cluster.getNamesystem(0);
     LightWeightCache<CacheEntry, CacheEntry> cacheSet = 
         (LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
-    assertEquals(14, cacheSet.size());
+    assertEquals(20, cacheSet.size());
     
     Map<CacheEntry, CacheEntry> oldEntries = 
         new HashMap<CacheEntry, CacheEntry>();
@@ -168,7 +178,7 @@ public class TestRetryCacheWithHA {
     FSNamesystem fsn1 = cluster.getNamesystem(1);
     cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
         .getRetryCache().getCacheSet();
-    assertEquals(14, cacheSet.size());
+    assertEquals(20, cacheSet.size());
     iter = cacheSet.iterator();
     while (iter.hasNext()) {
       CacheEntry entry = iter.next();
@@ -734,6 +744,263 @@ public class TestRetryCacheWithHA {
     }
   }
   
+  /** addCacheDirective */
+  class AddCacheDirectiveInfoOp extends AtMostOnceOp {
+    private CacheDirectiveInfo directive;
+    private Long result;
+
+    AddCacheDirectiveInfoOp(DFSClient client,
+        CacheDirectiveInfo directive) {
+      super("addCacheDirective", client);
+      this.directive = directive;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      dfs.addCachePool(new CachePoolInfo(directive.getPool()));
+    }
+
+    @Override
+    void invoke() throws Exception {
+      result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      for (int i = 0; i < CHECKTIMES; i++) {
+        RemoteIterator<CacheDirectiveEntry> iter =
+            dfs.listCacheDirectives(
+                new CacheDirectiveInfo.Builder().
+                    setPool(directive.getPool()).
+                    setPath(directive.getPath()).
+                    build());
+        if (iter.hasNext()) {
+          return true;
+        }
+        Thread.sleep(1000);
+      }
+      return false;
+    }
+
+    @Override
+    Object getResult() {
+      return result;
+    }
+  }
+
+  /** modifyCacheDirective */
+  class ModifyCacheDirectiveInfoOp extends AtMostOnceOp {
+    private final CacheDirectiveInfo directive;
+    private final short newReplication;
+    private long id;
+
+    ModifyCacheDirectiveInfoOp(DFSClient client,
+        CacheDirectiveInfo directive, short newReplication) {
+      super("modifyCacheDirective", client);
+      this.directive = directive;
+      this.newReplication = newReplication;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      dfs.addCachePool(new CachePoolInfo(directive.getPool()));
+      id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.modifyCacheDirective(
+          new CacheDirectiveInfo.Builder().
+              setId(id).
+              setReplication(newReplication).
+              build(), EnumSet.of(CacheFlag.FORCE));
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      for (int i = 0; i < CHECKTIMES; i++) {
+        RemoteIterator<CacheDirectiveEntry> iter =
+            dfs.listCacheDirectives(
+                new CacheDirectiveInfo.Builder().
+                    setPool(directive.getPool()).
+                    setPath(directive.getPath()).
+                    build());
+        while (iter.hasNext()) {
+          CacheDirectiveInfo result = iter.next().getInfo();
+          if ((result.getId() == id) &&
+              (result.getReplication().shortValue() == newReplication)) {
+            return true;
+          }
+        }
+        Thread.sleep(1000);
+      }
+      return false;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+
+  /** removeCacheDirective */
+  class RemoveCacheDirectiveInfoOp extends AtMostOnceOp {
+    private CacheDirectiveInfo directive;
+    private long id;
+
+    RemoveCacheDirectiveInfoOp(DFSClient client, String pool,
+        String path) {
+      super("removeCacheDirective", client);
+      this.directive = new CacheDirectiveInfo.Builder().
+          setPool(pool).
+          setPath(new Path(path)).
+          build();
+    }
+
+    @Override
+    void prepare() throws Exception {
+      dfs.addCachePool(new CachePoolInfo(directive.getPool()));
+      id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.removeCacheDirective(id);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      for (int i = 0; i < CHECKTIMES; i++) {
+        RemoteIterator<CacheDirectiveEntry> iter =
+            dfs.listCacheDirectives(
+                new CacheDirectiveInfo.Builder().
+                  setPool(directive.getPool()).
+                  setPath(directive.getPath()).
+                  build());
+        if (!iter.hasNext()) {
+          return true;
+        }
+        Thread.sleep(1000);
+      }
+      return false;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+
+  /** addCachePool */
+  class AddCachePoolOp extends AtMostOnceOp {
+    private String pool;
+
+    AddCachePoolOp(DFSClient client, String pool) {
+      super("addCachePool", client);
+      this.pool = pool;
+    }
+
+    @Override
+    void prepare() throws Exception {
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.addCachePool(new CachePoolInfo(pool));
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      for (int i = 0; i < CHECKTIMES; i++) {
+        RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
+        if (iter.hasNext()) {
+          return true;
+        }
+        Thread.sleep(1000);
+      }
+      return false;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+
+  /** modifyCachePool */
+  class ModifyCachePoolOp extends AtMostOnceOp {
+    String pool;
+
+    ModifyCachePoolOp(DFSClient client, String pool) {
+      super("modifyCachePool", client);
+      this.pool = pool;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      for (int i = 0; i < CHECKTIMES; i++) {
+        RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
+        if (iter.hasNext() && (long)iter.next().getInfo().getLimit() == 99) {
+          return true;
+        }
+        Thread.sleep(1000);
+      }
+      return false;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+
+  /** removeCachePool */
+  class RemoveCachePoolOp extends AtMostOnceOp {
+    private String pool;
+
+    RemoveCachePoolOp(DFSClient client, String pool) {
+      super("removeCachePool", client);
+      this.pool = pool;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      client.addCachePool(new CachePoolInfo(pool));
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.removeCachePool(pool);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      for (int i = 0; i < CHECKTIMES; i++) {
+        RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
+        if (!iter.hasNext()) {
+          return true;
+        }
+        Thread.sleep(1000);
+      }
+      return false;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+
   @Test (timeout=60000)
   public void testCreateSnapshot() throws Exception {
     final DFSClient client = genClientWithDummyHandler();
@@ -811,6 +1078,58 @@ public class TestRetryCacheWithHA {
     testClientRetryWithFailover(op);
   }
   
+  @Test (timeout=60000)
+  public void testAddCacheDirectiveInfo() throws Exception {
+    DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new AddCacheDirectiveInfoOp(client, 
+        new CacheDirectiveInfo.Builder().
+            setPool("pool").
+            setPath(new Path("/path")).
+            build());
+    testClientRetryWithFailover(op);
+  }
+
+  @Test (timeout=60000)
+  public void testModifyCacheDirectiveInfo() throws Exception {
+    DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new ModifyCacheDirectiveInfoOp(client, 
+        new CacheDirectiveInfo.Builder().
+            setPool("pool").
+            setPath(new Path("/path")).
+            setReplication((short)1).build(),
+        (short)555);
+    testClientRetryWithFailover(op);
+  }
+
+  @Test (timeout=60000)
+  public void testRemoveCacheDescriptor() throws Exception {
+    DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new RemoveCacheDirectiveInfoOp(client, "pool",
+        "/path");
+    testClientRetryWithFailover(op);
+  }
+
+  @Test (timeout=60000)
+  public void testAddCachePool() throws Exception {
+    DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new AddCachePoolOp(client, "pool");
+    testClientRetryWithFailover(op);
+  }
+
+  @Test (timeout=60000)
+  public void testModifyCachePool() throws Exception {
+    DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new ModifyCachePoolOp(client, "pool");
+    testClientRetryWithFailover(op);
+  }
+
+  @Test (timeout=60000)
+  public void testRemoveCachePool() throws Exception {
+    DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new RemoveCachePoolOp(client, "pool");
+    testClientRetryWithFailover(op);
+  }
+
   /**
    * When NN failover happens, if the client did not receive the response and
    * send a retry request to the other NN, the same response should be recieved
@@ -863,4 +1182,92 @@ public class TestRetryCacheWithHA {
           + results.get(op.name));
     }
   }
+
+  /**
+   * Add a list of cache pools, list cache pools,
+   * switch active NN, and list cache pools again.
+   */
+  @Test (timeout=60000)
+  public void testListCachePools() throws Exception {
+    final int poolCount = 7;
+    HashSet<String> poolNames = new HashSet<String>(poolCount);
+    for (int i=0; i<poolCount; i++) {
+      String poolName = "testListCachePools-" + i;
+      dfs.addCachePool(new CachePoolInfo(poolName));
+      poolNames.add(poolName);
+    }
+    listCachePools(poolNames, 0);
+
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+    cluster.waitActive(1);
+    listCachePools(poolNames, 1);
+  }
+
+  /**
+   * Add a list of cache directives, list cache directives,
+   * switch active NN, and list cache directives again.
+   */
+  @Test (timeout=60000)
+  public void testListCacheDirectives() throws Exception {
+    final int poolCount = 7;
+    HashSet<String> poolNames = new HashSet<String>(poolCount);
+    Path path = new Path("/p");
+    for (int i=0; i<poolCount; i++) {
+      String poolName = "testListCacheDirectives-" + i;
+      CacheDirectiveInfo directiveInfo =
+        new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build();
+      dfs.addCachePool(new CachePoolInfo(poolName));
+      dfs.addCacheDirective(directiveInfo, EnumSet.of(CacheFlag.FORCE));
+      poolNames.add(poolName);
+    }
+    listCacheDirectives(poolNames, 0);
+
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+    cluster.waitActive(1);
+    listCacheDirectives(poolNames, 1);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void listCachePools(
+      HashSet<String> poolNames, int active) throws Exception {
+    HashSet<String> tmpNames = (HashSet<String>)poolNames.clone();
+    RemoteIterator<CachePoolEntry> pools = dfs.listCachePools();
+    int poolCount = poolNames.size();
+    for (int i=0; i<poolCount; i++) {
+      CachePoolEntry pool = pools.next();
+      String pollName = pool.getInfo().getPoolName();
+      assertTrue("The pool name should be expected", tmpNames.remove(pollName));
+      if (i % 2 == 0) {
+        int standby = active;
+        active = (standby == 0) ? 1 : 0;
+        cluster.transitionToStandby(standby);
+        cluster.transitionToActive(active);
+        cluster.waitActive(active);
+      }
+    }
+    assertTrue("All pools must be found", tmpNames.isEmpty());
+  }
+
+  @SuppressWarnings("unchecked")
+  private void listCacheDirectives(
+      HashSet<String> poolNames, int active) throws Exception {
+    HashSet<String> tmpNames = (HashSet<String>)poolNames.clone();
+    RemoteIterator<CacheDirectiveEntry> directives = dfs.listCacheDirectives(null);
+    int poolCount = poolNames.size();
+    for (int i=0; i<poolCount; i++) {
+      CacheDirectiveEntry directive = directives.next();
+      String pollName = directive.getInfo().getPool();
+      assertTrue("The pool name should be expected", tmpNames.remove(pollName));
+      if (i % 2 == 0) {
+        int standby = active;
+        active = (standby == 0) ? 1 : 0;
+        cluster.transitionToStandby(standby);
+        cluster.transitionToActive(active);
+        cluster.waitActive(active);
+      }
+    }
+    assertTrue("All pools must be found", tmpNames.isEmpty());
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java Wed Jan 22 21:43:00 2014
@@ -80,6 +80,8 @@ public class TestJsonUtil {
     response.put("xceiverCount", 4096l);
     response.put("networkLocation", "foo.bar.baz");
     response.put("adminState", "NORMAL");
+    response.put("cacheCapacity", 123l);
+    response.put("cacheUsed", 321l);
     
     JsonUtil.toDatanodeInfo(response);
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
Binary files - no diff available.



Mime
View raw message