Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1908E10F9C for ; Wed, 22 Jan 2014 21:44:07 +0000 (UTC) Received: (qmail 87537 invoked by uid 500); 22 Jan 2014 21:44:02 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 87395 invoked by uid 500); 22 Jan 2014 21:44:01 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 87350 invoked by uid 99); 22 Jan 2014 21:44:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jan 2014 21:44:00 +0000 X-ASF-Spam-Status: No, hits=-1998.0 required=5.0 tests=ALL_TRUSTED,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jan 2014 21:43:56 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B75322388C2A; Wed, 22 Jan 2014 21:43:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1560522 [8/9] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/m... Date: Wed, 22 Jan 2014 21:43:04 -0000 To: hdfs-commits@hadoop.apache.org From: wang@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140122214309.B75322388C2A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1560522&r1=1560521&r2=1560522&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Wed Jan 22 21:43:00 2014 @@ -956,8 +956,8 @@ public class NNThroughputBenchmark imple // TODO:FEDERATION currently a single block pool is supported StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; - DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, - rep, 0, 0, 0).getCommands(); + DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep, + 0L, 0L, 0, 0, 0).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1004,7 +1004,7 @@ public class NNThroughputBenchmark imple StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, - rep, 0, 0, 0).getCommands(); + rep, 0L, 0L, 0, 0, 0).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1560522&r1=1560521&r2=1560522&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Jan 22 21:43:00 2014 @@ -114,7 +114,7 @@ public class NameNodeAdapter { DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException { return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), - 0, 0, 0); + dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0); } public static boolean setReplication(final FSNamesystem ns, Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1560522&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Wed Jan 22 21:43:00 2014 @@ -0,0 +1,1393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; +import static org.apache.hadoop.hdfs.protocol.CachePoolInfo.RELATIVE_EXPIRY_NEVER; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Date; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang.time.DateUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.LogVerificationAppender; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolStats; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.GSet; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Supplier; + +public class TestCacheDirectives { + static final Log LOG = LogFactory.getLog(TestCacheDirectives.class); + + private static final UserGroupInformation unprivilegedUser = + UserGroupInformation.createRemoteUser("unprivilegedUser"); + + static private Configuration conf; + static private MiniDFSCluster cluster; + static private DistributedFileSystem dfs; + static private NamenodeProtocols proto; + static private NameNode namenode; + static private CacheManipulator prevCacheManipulator; + + static { + NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(false); + } + + private static final long BLOCK_SIZE = 4096; + private static final int NUM_DATANODES = 4; + // Most Linux installs will allow non-root users to lock 64KB. + // In this test though, we stub out mlock so this doesn't matter. + private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES; + + private static HdfsConfiguration createCachingConf() { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); + conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000); + // set low limits here for testing purposes + conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, + 2); + + return conf; + } + + @Before + public void setup() throws Exception { + conf = createCachingConf(); + cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + proto = cluster.getNameNodeRpc(); + namenode = cluster.getNameNode(); + prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); + NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); + LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(CacheManager.class.getName()).setLevel( + Level.TRACE); + } + + @After + public void teardown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + // Restore the original CacheManipulator + NativeIO.POSIX.setCacheManipulator(prevCacheManipulator); + } + + @Test(timeout=60000) + public void testBasicPoolOperations() throws Exception { + final String poolName = "pool1"; + CachePoolInfo info = new CachePoolInfo(poolName). + setOwnerName("bob").setGroupName("bobgroup"). + setMode(new FsPermission((short)0755)).setLimit(150l); + + // Add a pool + dfs.addCachePool(info); + + // Do some bad addCachePools + try { + dfs.addCachePool(info); + fail("added the pool with the same name twice"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("pool1 already exists", ioe); + } + try { + dfs.addCachePool(new CachePoolInfo("")); + fail("added empty pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("invalid empty cache pool name", + ioe); + } + try { + dfs.addCachePool(null); + fail("added null pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe); + } + try { + proto.addCachePool(new CachePoolInfo("")); + fail("added empty pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("invalid empty cache pool name", + ioe); + } + try { + proto.addCachePool(null); + fail("added null pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe); + } + + // Modify the pool + info.setOwnerName("jane").setGroupName("janegroup") + .setMode(new FsPermission((short)0700)).setLimit(314l); + dfs.modifyCachePool(info); + + // Do some invalid modify pools + try { + dfs.modifyCachePool(new CachePoolInfo("fool")); + fail("modified non-existent cache pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("fool does not exist", ioe); + } + try { + dfs.modifyCachePool(new CachePoolInfo("")); + fail("modified empty pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("invalid empty cache pool name", + ioe); + } + try { + dfs.modifyCachePool(null); + fail("modified null pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe); + } + try { + proto.modifyCachePool(new CachePoolInfo("")); + fail("modified empty pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("invalid empty cache pool name", + ioe); + } + try { + proto.modifyCachePool(null); + fail("modified null pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe); + } + + // Remove the pool + dfs.removeCachePool(poolName); + // Do some bad removePools + try { + dfs.removeCachePool("pool99"); + fail("expected to get an exception when " + + "removing a non-existent pool."); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Cannot remove " + + "non-existent cache pool", ioe); + } + try { + dfs.removeCachePool(poolName); + fail("expected to get an exception when " + + "removing a non-existent pool."); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Cannot remove " + + "non-existent cache pool", ioe); + } + try { + dfs.removeCachePool(""); + fail("removed empty pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("invalid empty cache pool name", + ioe); + } + try { + dfs.removeCachePool(null); + fail("removed null pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("invalid empty cache pool name", + ioe); + } + try { + proto.removeCachePool(""); + fail("removed empty pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("invalid empty cache pool name", + ioe); + } + try { + proto.removeCachePool(null); + fail("removed null pool"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("invalid empty cache pool name", + ioe); + } + + info = new CachePoolInfo("pool2"); + dfs.addCachePool(info); + } + + @Test(timeout=60000) + public void testCreateAndModifyPools() throws Exception { + String poolName = "pool1"; + String ownerName = "abc"; + String groupName = "123"; + FsPermission mode = new FsPermission((short)0755); + long limit = 150; + dfs.addCachePool(new CachePoolInfo(poolName). + setOwnerName(ownerName).setGroupName(groupName). + setMode(mode).setLimit(limit)); + + RemoteIterator iter = dfs.listCachePools(); + CachePoolInfo info = iter.next().getInfo(); + assertEquals(poolName, info.getPoolName()); + assertEquals(ownerName, info.getOwnerName()); + assertEquals(groupName, info.getGroupName()); + + ownerName = "def"; + groupName = "456"; + mode = new FsPermission((short)0700); + limit = 151; + dfs.modifyCachePool(new CachePoolInfo(poolName). + setOwnerName(ownerName).setGroupName(groupName). + setMode(mode).setLimit(limit)); + + iter = dfs.listCachePools(); + info = iter.next().getInfo(); + assertEquals(poolName, info.getPoolName()); + assertEquals(ownerName, info.getOwnerName()); + assertEquals(groupName, info.getGroupName()); + assertEquals(mode, info.getMode()); + assertEquals(limit, (long)info.getLimit()); + + dfs.removeCachePool(poolName); + iter = dfs.listCachePools(); + assertFalse("expected no cache pools after deleting pool", iter.hasNext()); + + proto.listCachePools(null); + + try { + proto.removeCachePool("pool99"); + fail("expected to get an exception when " + + "removing a non-existent pool."); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Cannot remove non-existent", + ioe); + } + try { + proto.removeCachePool(poolName); + fail("expected to get an exception when " + + "removing a non-existent pool."); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Cannot remove non-existent", + ioe); + } + + iter = dfs.listCachePools(); + assertFalse("expected no cache pools after deleting pool", iter.hasNext()); + } + + private static void validateListAll( + RemoteIterator iter, + Long... ids) throws Exception { + for (Long id: ids) { + assertTrue("Unexpectedly few elements", iter.hasNext()); + assertEquals("Unexpected directive ID", id, + iter.next().getInfo().getId()); + } + assertFalse("Unexpectedly many list elements", iter.hasNext()); + } + + private static long addAsUnprivileged( + final CacheDirectiveInfo directive) throws Exception { + return unprivilegedUser + .doAs(new PrivilegedExceptionAction() { + @Override + public Long run() throws IOException { + DistributedFileSystem myDfs = + (DistributedFileSystem) FileSystem.get(conf); + return myDfs.addCacheDirective(directive); + } + }); + } + + @Test(timeout=60000) + public void testAddRemoveDirectives() throws Exception { + proto.addCachePool(new CachePoolInfo("pool1"). + setMode(new FsPermission((short)0777))); + proto.addCachePool(new CachePoolInfo("pool2"). + setMode(new FsPermission((short)0777))); + proto.addCachePool(new CachePoolInfo("pool3"). + setMode(new FsPermission((short)0777))); + proto.addCachePool(new CachePoolInfo("pool4"). + setMode(new FsPermission((short)0))); + + CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder(). + setPath(new Path("/alpha")). + setPool("pool1"). + build(); + CacheDirectiveInfo beta = new CacheDirectiveInfo.Builder(). + setPath(new Path("/beta")). + setPool("pool2"). + build(); + CacheDirectiveInfo delta = new CacheDirectiveInfo.Builder(). + setPath(new Path("/delta")). + setPool("pool1"). + build(); + + long alphaId = addAsUnprivileged(alpha); + long alphaId2 = addAsUnprivileged(alpha); + assertFalse("Expected to get unique directives when re-adding an " + + "existing CacheDirectiveInfo", + alphaId == alphaId2); + long betaId = addAsUnprivileged(beta); + + try { + addAsUnprivileged(new CacheDirectiveInfo.Builder(). + setPath(new Path("/unicorn")). + setPool("no_such_pool"). + build()); + fail("expected an error when adding to a non-existent pool."); + } catch (InvalidRequestException ioe) { + GenericTestUtils.assertExceptionContains("Unknown pool", ioe); + } + + try { + addAsUnprivileged(new CacheDirectiveInfo.Builder(). + setPath(new Path("/blackhole")). + setPool("pool4"). + build()); + fail("expected an error when adding to a pool with " + + "mode 0 (no permissions for anyone)."); + } catch (AccessControlException e) { + GenericTestUtils. + assertExceptionContains("Permission denied while accessing pool", e); + } + + try { + addAsUnprivileged(new CacheDirectiveInfo.Builder(). + setPath(new Path("/illegal:path/")). + setPool("pool1"). + build()); + fail("expected an error when adding a malformed path " + + "to the cache directives."); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains("is not a valid DFS filename", e); + } + + try { + addAsUnprivileged(new CacheDirectiveInfo.Builder(). + setPath(new Path("/emptypoolname")). + setReplication((short)1). + setPool(""). + build()); + fail("expected an error when adding a cache " + + "directive with an empty pool name."); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("Invalid empty pool name", e); + } + + long deltaId = addAsUnprivileged(delta); + + // We expect the following to succeed, because DistributedFileSystem + // qualifies the path. + long relativeId = addAsUnprivileged( + new CacheDirectiveInfo.Builder(). + setPath(new Path("relative")). + setPool("pool1"). + build()); + + RemoteIterator iter; + iter = dfs.listCacheDirectives(null); + validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId ); + iter = dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder().setPool("pool3").build()); + assertFalse(iter.hasNext()); + iter = dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder().setPool("pool1").build()); + validateListAll(iter, alphaId, alphaId2, deltaId, relativeId ); + iter = dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder().setPool("pool2").build()); + validateListAll(iter, betaId); + + dfs.removeCacheDirective(betaId); + iter = dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder().setPool("pool2").build()); + assertFalse(iter.hasNext()); + + try { + dfs.removeCacheDirective(betaId); + fail("expected an error when removing a non-existent ID"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("No directive with ID", e); + } + + try { + proto.removeCacheDirective(-42l); + fail("expected an error when removing a negative ID"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains( + "Invalid negative ID", e); + } + try { + proto.removeCacheDirective(43l); + fail("expected an error when removing a non-existent ID"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("No directive with ID", e); + } + + dfs.removeCacheDirective(alphaId); + dfs.removeCacheDirective(alphaId2); + dfs.removeCacheDirective(deltaId); + + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(). + setId(relativeId). + setReplication((short)555). + build()); + iter = dfs.listCacheDirectives(null); + assertTrue(iter.hasNext()); + CacheDirectiveInfo modified = iter.next().getInfo(); + assertEquals(relativeId, modified.getId().longValue()); + assertEquals((short)555, modified.getReplication().shortValue()); + dfs.removeCacheDirective(relativeId); + iter = dfs.listCacheDirectives(null); + assertFalse(iter.hasNext()); + + // Verify that PBCDs with path "." work correctly + CacheDirectiveInfo directive = + new CacheDirectiveInfo.Builder().setPath(new Path(".")) + .setPool("pool1").build(); + long id = dfs.addCacheDirective(directive); + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder( + directive).setId(id).setReplication((short)2).build()); + dfs.removeCacheDirective(id); + } + + @Test(timeout=60000) + public void testCacheManagerRestart() throws Exception { + // Create and validate a pool + final String pool = "poolparty"; + String groupName = "partygroup"; + FsPermission mode = new FsPermission((short)0777); + long limit = 747; + dfs.addCachePool(new CachePoolInfo(pool) + .setGroupName(groupName) + .setMode(mode) + .setLimit(limit)); + RemoteIterator pit = dfs.listCachePools(); + assertTrue("No cache pools found", pit.hasNext()); + CachePoolInfo info = pit.next().getInfo(); + assertEquals(pool, info.getPoolName()); + assertEquals(groupName, info.getGroupName()); + assertEquals(mode, info.getMode()); + assertEquals(limit, (long)info.getLimit()); + assertFalse("Unexpected # of cache pools found", pit.hasNext()); + + // Create some cache entries + int numEntries = 10; + String entryPrefix = "/party-"; + long prevId = -1; + final Date expiry = new Date(); + for (int i=0; i dit + = dfs.listCacheDirectives(null); + for (int i=0; i() { + @Override + public Boolean get() { + int numCachedBlocks = 0, numCachedReplicas = 0; + namesystem.readLock(); + try { + GSet cachedBlocks = + cacheManager.getCachedBlocks(); + if (cachedBlocks != null) { + for (Iterator iter = cachedBlocks.iterator(); + iter.hasNext(); ) { + CachedBlock cachedBlock = iter.next(); + numCachedBlocks++; + numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size(); + } + } + } finally { + namesystem.readUnlock(); + } + if (expectedCachedBlocks == -1 || + numCachedBlocks == expectedCachedBlocks) { + if (expectedCachedReplicas == -1 || + numCachedReplicas == expectedCachedReplicas) { + return true; + } + } + LOG.info(logString + " cached blocks: have " + numCachedBlocks + + " / " + expectedCachedBlocks + ". " + + "cached replicas: have " + numCachedReplicas + + " / " + expectedCachedReplicas); + return false; + } + }, 500, 60000); + } + + private static void waitForCacheDirectiveStats(final DistributedFileSystem dfs, + final long targetBytesNeeded, final long targetBytesCached, + final long targetFilesNeeded, final long targetFilesCached, + final CacheDirectiveInfo filter, final String infoString) + throws Exception { + LOG.info("Polling listCacheDirectives " + + ((filter == null) ? "ALL" : filter.toString()) + " for " + + targetBytesNeeded + " targetBytesNeeded, " + + targetBytesCached + " targetBytesCached, " + + targetFilesNeeded + " targetFilesNeeded, " + + targetFilesCached + " targetFilesCached"); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + RemoteIterator iter = null; + CacheDirectiveEntry entry = null; + try { + iter = dfs.listCacheDirectives(filter); + entry = iter.next(); + } catch (IOException e) { + fail("got IOException while calling " + + "listCacheDirectives: " + e.getMessage()); + } + Assert.assertNotNull(entry); + CacheDirectiveStats stats = entry.getStats(); + if ((targetBytesNeeded == stats.getBytesNeeded()) && + (targetBytesCached == stats.getBytesCached()) && + (targetFilesNeeded == stats.getFilesNeeded()) && + (targetFilesCached == stats.getFilesCached())) { + return true; + } else { + LOG.info(infoString + ": " + + "filesNeeded: " + + stats.getFilesNeeded() + "/" + targetFilesNeeded + + ", filesCached: " + + stats.getFilesCached() + "/" + targetFilesCached + + ", bytesNeeded: " + + stats.getBytesNeeded() + "/" + targetBytesNeeded + + ", bytesCached: " + + stats.getBytesCached() + "/" + targetBytesCached); + return false; + } + } + }, 500, 60000); + } + + private static void waitForCachePoolStats(final DistributedFileSystem dfs, + final long targetBytesNeeded, final long targetBytesCached, + final long targetFilesNeeded, final long targetFilesCached, + final CachePoolInfo pool, final String infoString) + throws Exception { + LOG.info("Polling listCachePools " + pool.toString() + " for " + + targetBytesNeeded + " targetBytesNeeded, " + + targetBytesCached + " targetBytesCached, " + + targetFilesNeeded + " targetFilesNeeded, " + + targetFilesCached + " targetFilesCached"); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + RemoteIterator iter = null; + try { + iter = dfs.listCachePools(); + } catch (IOException e) { + fail("got IOException while calling " + + "listCachePools: " + e.getMessage()); + } + while (true) { + CachePoolEntry entry = null; + try { + if (!iter.hasNext()) { + break; + } + entry = iter.next(); + } catch (IOException e) { + fail("got IOException while iterating through " + + "listCachePools: " + e.getMessage()); + } + if (entry == null) { + break; + } + if (!entry.getInfo().getPoolName().equals(pool.getPoolName())) { + continue; + } + CachePoolStats stats = entry.getStats(); + if ((targetBytesNeeded == stats.getBytesNeeded()) && + (targetBytesCached == stats.getBytesCached()) && + (targetFilesNeeded == stats.getFilesNeeded()) && + (targetFilesCached == stats.getFilesCached())) { + return true; + } else { + LOG.info(infoString + ": " + + "filesNeeded: " + + stats.getFilesNeeded() + "/" + targetFilesNeeded + + ", filesCached: " + + stats.getFilesCached() + "/" + targetFilesCached + + ", bytesNeeded: " + + stats.getBytesNeeded() + "/" + targetBytesNeeded + + ", bytesCached: " + + stats.getBytesCached() + "/" + targetBytesCached); + return false; + } + } + return false; + } + }, 500, 60000); + } + + private static void checkNumCachedReplicas(final DistributedFileSystem dfs, + final List paths, final int expectedBlocks, + final int expectedReplicas) + throws Exception { + int numCachedBlocks = 0; + int numCachedReplicas = 0; + for (Path p: paths) { + final FileStatus f = dfs.getFileStatus(p); + final long len = f.getLen(); + final long blockSize = f.getBlockSize(); + // round it up to full blocks + final long numBlocks = (len + blockSize - 1) / blockSize; + BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len); + assertEquals("Unexpected number of block locations for path " + p, + numBlocks, locs.length); + for (BlockLocation l: locs) { + if (l.getCachedHosts().length > 0) { + numCachedBlocks++; + } + numCachedReplicas += l.getCachedHosts().length; + } + } + LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks"); + LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas + + " replicas"); + assertEquals("Unexpected number of cached blocks", expectedBlocks, + numCachedBlocks); + assertEquals("Unexpected number of cached replicas", expectedReplicas, + numCachedReplicas); + } + + @Test(timeout=120000) + public void testWaitForCachedReplicas() throws Exception { + FileSystemTestHelper helper = new FileSystemTestHelper(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return ((namenode.getNamesystem().getCacheCapacity() == + (NUM_DATANODES * CACHE_CAPACITY)) && + (namenode.getNamesystem().getCacheUsed() == 0)); + } + }, 500, 60000); + + // Send a cache report referring to a bogus block. It is important that + // the NameNode be robust against this. + NamenodeProtocols nnRpc = namenode.getRpcServer(); + DataNode dn0 = cluster.getDataNodes().get(0); + String bpid = cluster.getNamesystem().getBlockPoolId(); + LinkedList bogusBlockIds = new LinkedList (); + bogusBlockIds.add(999999L); + nnRpc.cacheReport(dn0.getDNRegistrationForBP(bpid), bpid, bogusBlockIds); + + Path rootDir = helper.getDefaultWorkingDirectory(dfs); + // Create the pool + final String pool = "friendlyPool"; + nnRpc.addCachePool(new CachePoolInfo("friendlyPool")); + // Create some test files + final int numFiles = 2; + final int numBlocksPerFile = 2; + final List paths = new ArrayList(numFiles); + for (int i=0; i entries = + new CacheDirectiveIterator(nnRpc, null); + for (int i=0; i paths = new LinkedList(); + paths.add(new Path("/foo/bar")); + paths.add(new Path("/foo/baz")); + paths.add(new Path("/foo2/bar2")); + paths.add(new Path("/foo2/baz2")); + dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault()); + dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault()); + final int numBlocksPerFile = 2; + for (Path path : paths) { + FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile, + (int)BLOCK_SIZE, (short)3, false); + } + waitForCachedBlocks(namenode, 0, 0, + "testWaitForCachedReplicasInDirectory:0"); + + // cache entire directory + long id = dfs.addCacheDirective( + new CacheDirectiveInfo.Builder(). + setPath(new Path("/foo")). + setReplication((short)2). + setPool(pool). + build()); + waitForCachedBlocks(namenode, 4, 8, + "testWaitForCachedReplicasInDirectory:1:blocks"); + // Verify that listDirectives gives the stats we want. + waitForCacheDirectiveStats(dfs, + 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE, + 2, 2, + new CacheDirectiveInfo.Builder(). + setPath(new Path("/foo")). + build(), + "testWaitForCachedReplicasInDirectory:1:directive"); + waitForCachePoolStats(dfs, + 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE, + 2, 2, + poolInfo, "testWaitForCachedReplicasInDirectory:1:pool"); + + long id2 = dfs.addCacheDirective( + new CacheDirectiveInfo.Builder(). + setPath(new Path("/foo/bar")). + setReplication((short)4). + setPool(pool). + build()); + // wait for an additional 2 cached replicas to come up + waitForCachedBlocks(namenode, 4, 10, + "testWaitForCachedReplicasInDirectory:2:blocks"); + // the directory directive's stats are unchanged + waitForCacheDirectiveStats(dfs, + 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE, + 2, 2, + new CacheDirectiveInfo.Builder(). + setPath(new Path("/foo")). + build(), + "testWaitForCachedReplicasInDirectory:2:directive-1"); + // verify /foo/bar's stats + waitForCacheDirectiveStats(dfs, + 4 * numBlocksPerFile * BLOCK_SIZE, + // only 3 because the file only has 3 replicas, not 4 as requested. + 3 * numBlocksPerFile * BLOCK_SIZE, + 1, + // only 0 because the file can't be fully cached + 0, + new CacheDirectiveInfo.Builder(). + setPath(new Path("/foo/bar")). + build(), + "testWaitForCachedReplicasInDirectory:2:directive-2"); + waitForCachePoolStats(dfs, + (4+4) * numBlocksPerFile * BLOCK_SIZE, + (4+3) * numBlocksPerFile * BLOCK_SIZE, + 3, 2, + poolInfo, "testWaitForCachedReplicasInDirectory:2:pool"); + // remove and watch numCached go to 0 + dfs.removeCacheDirective(id); + dfs.removeCacheDirective(id2); + waitForCachedBlocks(namenode, 0, 0, + "testWaitForCachedReplicasInDirectory:3:blocks"); + waitForCachePoolStats(dfs, + 0, 0, + 0, 0, + poolInfo, "testWaitForCachedReplicasInDirectory:3:pool"); + } + + /** + * Tests stepping the cache replication factor up and down, checking the + * number of cached replicas and blocks as well as the advertised locations. + * @throws Exception + */ + @Test(timeout=120000) + public void testReplicationFactor() throws Exception { + // Create the pool + final String pool = "friendlyPool"; + dfs.addCachePool(new CachePoolInfo(pool)); + // Create some test files + final List paths = new LinkedList(); + paths.add(new Path("/foo/bar")); + paths.add(new Path("/foo/baz")); + paths.add(new Path("/foo2/bar2")); + paths.add(new Path("/foo2/baz2")); + dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault()); + dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault()); + final int numBlocksPerFile = 2; + for (Path path : paths) { + FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile, + (int)BLOCK_SIZE, (short)3, false); + } + waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0"); + checkNumCachedReplicas(dfs, paths, 0, 0); + // cache directory + long id = dfs.addCacheDirective( + new CacheDirectiveInfo.Builder(). + setPath(new Path("/foo")). + setReplication((short)1). + setPool(pool). + build()); + waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1"); + checkNumCachedReplicas(dfs, paths, 4, 4); + // step up the replication factor + for (int i=2; i<=3; i++) { + dfs.modifyCacheDirective( + new CacheDirectiveInfo.Builder(). + setId(id). + setReplication((short)i). + build()); + waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2"); + checkNumCachedReplicas(dfs, paths, 4, 4*i); + } + // step it down + for (int i=2; i>=1; i--) { + dfs.modifyCacheDirective( + new CacheDirectiveInfo.Builder(). + setId(id). + setReplication((short)i). + build()); + waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3"); + checkNumCachedReplicas(dfs, paths, 4, 4*i); + } + // remove and watch numCached go to 0 + dfs.removeCacheDirective(id); + waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4"); + checkNumCachedReplicas(dfs, paths, 0, 0); + } + + @Test(timeout=60000) + public void testListCachePoolPermissions() throws Exception { + final UserGroupInformation myUser = UserGroupInformation + .createRemoteUser("myuser"); + final DistributedFileSystem myDfs = + (DistributedFileSystem)DFSTestUtil.getFileSystemAs(myUser, conf); + final String poolName = "poolparty"; + dfs.addCachePool(new CachePoolInfo(poolName) + .setMode(new FsPermission((short)0700))); + // Should only see partial info + RemoteIterator it = myDfs.listCachePools(); + CachePoolInfo info = it.next().getInfo(); + assertFalse(it.hasNext()); + assertEquals("Expected pool name", poolName, info.getPoolName()); + assertNull("Unexpected owner name", info.getOwnerName()); + assertNull("Unexpected group name", info.getGroupName()); + assertNull("Unexpected mode", info.getMode()); + assertNull("Unexpected limit", info.getLimit()); + // Modify the pool so myuser is now the owner + final long limit = 99; + dfs.modifyCachePool(new CachePoolInfo(poolName) + .setOwnerName(myUser.getShortUserName()) + .setLimit(limit)); + // Should see full info + it = myDfs.listCachePools(); + info = it.next().getInfo(); + assertFalse(it.hasNext()); + assertEquals("Expected pool name", poolName, info.getPoolName()); + assertEquals("Mismatched owner name", myUser.getShortUserName(), + info.getOwnerName()); + assertNotNull("Expected group name", info.getGroupName()); + assertEquals("Mismatched mode", (short) 0700, + info.getMode().toShort()); + assertEquals("Mismatched limit", limit, (long)info.getLimit()); + } + + @Test(timeout=120000) + public void testExpiry() throws Exception { + String pool = "pool1"; + dfs.addCachePool(new CachePoolInfo(pool)); + Path p = new Path("/mypath"); + DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999); + // Expire after test timeout + Date start = new Date(); + Date expiry = DateUtils.addSeconds(start, 120); + final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder() + .setPath(p) + .setPool(pool) + .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry)) + .setReplication((short)2) + .build()); + waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1"); + // Change it to expire sooner + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id) + .setExpiration(Expiration.newRelative(0)).build()); + waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2"); + RemoteIterator it = dfs.listCacheDirectives(null); + CacheDirectiveEntry ent = it.next(); + assertFalse(it.hasNext()); + Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis()); + assertTrue("Directive should have expired", + entryExpiry.before(new Date())); + // Change it back to expire later + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id) + .setExpiration(Expiration.newRelative(120000)).build()); + waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3"); + it = dfs.listCacheDirectives(null); + ent = it.next(); + assertFalse(it.hasNext()); + entryExpiry = new Date(ent.getInfo().getExpiration().getMillis()); + assertTrue("Directive should not have expired", + entryExpiry.after(new Date())); + // Verify that setting a negative TTL throws an error + try { + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id) + .setExpiration(Expiration.newRelative(-1)).build()); + } catch (InvalidRequestException e) { + GenericTestUtils + .assertExceptionContains("Cannot set a negative expiration", e); + } + } + + @Test(timeout=120000) + public void testLimit() throws Exception { + try { + dfs.addCachePool(new CachePoolInfo("poolofnegativity").setLimit(-99l)); + fail("Should not be able to set a negative limit"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("negative", e); + } + final String destiny = "poolofdestiny"; + final Path path1 = new Path("/destiny"); + DFSTestUtil.createFile(dfs, path1, 2*BLOCK_SIZE, (short)1, 0x9494); + // Start off with a limit that is too small + final CachePoolInfo poolInfo = new CachePoolInfo(destiny) + .setLimit(2*BLOCK_SIZE-1); + dfs.addCachePool(poolInfo); + final CacheDirectiveInfo info1 = new CacheDirectiveInfo.Builder() + .setPool(destiny).setPath(path1).build(); + try { + dfs.addCacheDirective(info1); + fail("Should not be able to cache when there is no more limit"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("remaining capacity", e); + } + // Raise the limit up to fit and it should work this time + poolInfo.setLimit(2*BLOCK_SIZE); + dfs.modifyCachePool(poolInfo); + long id1 = dfs.addCacheDirective(info1); + waitForCachePoolStats(dfs, + 2*BLOCK_SIZE, 2*BLOCK_SIZE, + 1, 1, + poolInfo, "testLimit:1"); + // Adding another file, it shouldn't be cached + final Path path2 = new Path("/failure"); + DFSTestUtil.createFile(dfs, path2, BLOCK_SIZE, (short)1, 0x9495); + try { + dfs.addCacheDirective(new CacheDirectiveInfo.Builder() + .setPool(destiny).setPath(path2).build(), + EnumSet.noneOf(CacheFlag.class)); + fail("Should not be able to add another cached file"); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("remaining capacity", e); + } + // Bring the limit down, the first file should get uncached + poolInfo.setLimit(BLOCK_SIZE); + dfs.modifyCachePool(poolInfo); + waitForCachePoolStats(dfs, + 2*BLOCK_SIZE, 0, + 1, 0, + poolInfo, "testLimit:2"); + RemoteIterator it = dfs.listCachePools(); + assertTrue("Expected a cache pool", it.hasNext()); + CachePoolStats stats = it.next().getStats(); + assertEquals("Overlimit bytes should be difference of needed and limit", + BLOCK_SIZE, stats.getBytesOverlimit()); + // Moving a directive to a pool without enough limit should fail + CachePoolInfo inadequate = + new CachePoolInfo("poolofinadequacy").setLimit(BLOCK_SIZE); + dfs.addCachePool(inadequate); + try { + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1) + .setId(id1).setPool(inadequate.getPoolName()).build(), + EnumSet.noneOf(CacheFlag.class)); + } catch(InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("remaining capacity", e); + } + // Succeeds when force=true + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1).setId(id1) + .setPool(inadequate.getPoolName()).build(), + EnumSet.of(CacheFlag.FORCE)); + // Also can add with force=true + dfs.addCacheDirective( + new CacheDirectiveInfo.Builder().setPool(inadequate.getPoolName()) + .setPath(path1).build(), EnumSet.of(CacheFlag.FORCE)); + } + + @Test(timeout=30000) + public void testMaxRelativeExpiry() throws Exception { + // Test that negative and really big max expirations can't be set during add + try { + dfs.addCachePool(new CachePoolInfo("failpool").setMaxRelativeExpiryMs(-1l)); + fail("Added a pool with a negative max expiry."); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("negative", e); + } + try { + dfs.addCachePool(new CachePoolInfo("failpool") + .setMaxRelativeExpiryMs(Long.MAX_VALUE - 1)); + fail("Added a pool with too big of a max expiry."); + } catch (InvalidRequestException e) { + GenericTestUtils.assertExceptionContains("too big", e); + } + // Test that setting a max relative expiry on a pool works + CachePoolInfo coolPool = new CachePoolInfo("coolPool"); + final long poolExpiration = 1000 * 60 * 10l; + dfs.addCachePool(coolPool.setMaxRelativeExpiryMs(poolExpiration)); + RemoteIterator poolIt = dfs.listCachePools(); + CachePoolInfo listPool = poolIt.next().getInfo(); + assertFalse("Should only be one pool", poolIt.hasNext()); + assertEquals("Expected max relative expiry to match set value", + poolExpiration, listPool.getMaxRelativeExpiryMs().longValue()); + // Test that negative and really big max expirations can't be modified + try { + dfs.addCachePool(coolPool.setMaxRelativeExpiryMs(-1l)); + fail("Added a pool with a negative max expiry."); + } catch (InvalidRequestException e) { + assertExceptionContains("negative", e); + } + try { + dfs.modifyCachePool(coolPool + .setMaxRelativeExpiryMs(CachePoolInfo.RELATIVE_EXPIRY_NEVER+1)); + fail("Added a pool with too big of a max expiry."); + } catch (InvalidRequestException e) { + assertExceptionContains("too big", e); + } + // Test that adding a directives without an expiration uses the pool's max + CacheDirectiveInfo defaultExpiry = new CacheDirectiveInfo.Builder() + .setPath(new Path("/blah")) + .setPool(coolPool.getPoolName()) + .build(); + dfs.addCacheDirective(defaultExpiry); + RemoteIterator dirIt = + dfs.listCacheDirectives(defaultExpiry); + CacheDirectiveInfo listInfo = dirIt.next().getInfo(); + assertFalse("Should only have one entry in listing", dirIt.hasNext()); + long listExpiration = listInfo.getExpiration().getAbsoluteMillis() + - new Date().getTime(); + assertTrue("Directive expiry should be approximately the pool's max expiry", + Math.abs(listExpiration - poolExpiration) < 10*1000); + // Test that the max is enforced on add for relative and absolute + CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder() + .setPath(new Path("/lolcat")) + .setPool(coolPool.getPoolName()); + try { + dfs.addCacheDirective(builder + .setExpiration(Expiration.newRelative(poolExpiration+1)) + .build()); + fail("Added a directive that exceeds pool's max relative expiration"); + } catch (InvalidRequestException e) { + assertExceptionContains("exceeds the max relative expiration", e); + } + try { + dfs.addCacheDirective(builder + .setExpiration(Expiration.newAbsolute( + new Date().getTime() + poolExpiration + (10*1000))) + .build()); + fail("Added a directive that exceeds pool's max relative expiration"); + } catch (InvalidRequestException e) { + assertExceptionContains("exceeds the max relative expiration", e); + } + // Test that max is enforced on modify for relative and absolute Expirations + try { + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry) + .setId(listInfo.getId()) + .setExpiration(Expiration.newRelative(poolExpiration+1)) + .build()); + fail("Modified a directive to exceed pool's max relative expiration"); + } catch (InvalidRequestException e) { + assertExceptionContains("exceeds the max relative expiration", e); + } + try { + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry) + .setId(listInfo.getId()) + .setExpiration(Expiration.newAbsolute( + new Date().getTime() + poolExpiration + (10*1000))) + .build()); + fail("Modified a directive to exceed pool's max relative expiration"); + } catch (InvalidRequestException e) { + assertExceptionContains("exceeds the max relative expiration", e); + } + // Test some giant limit values with add + try { + dfs.addCacheDirective(builder + .setExpiration(Expiration.newRelative( + Long.MAX_VALUE)) + .build()); + fail("Added a directive with a gigantic max value"); + } catch (IllegalArgumentException e) { + assertExceptionContains("is too far in the future", e); + } + try { + dfs.addCacheDirective(builder + .setExpiration(Expiration.newAbsolute( + Long.MAX_VALUE)) + .build()); + fail("Added a directive with a gigantic max value"); + } catch (InvalidRequestException e) { + assertExceptionContains("is too far in the future", e); + } + // Test some giant limit values with modify + try { + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry) + .setId(listInfo.getId()) + .setExpiration(Expiration.NEVER) + .build()); + fail("Modified a directive to exceed pool's max relative expiration"); + } catch (InvalidRequestException e) { + assertExceptionContains("exceeds the max relative expiration", e); + } + try { + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry) + .setId(listInfo.getId()) + .setExpiration(Expiration.newAbsolute( + Long.MAX_VALUE)) + .build()); + fail("Modified a directive to exceed pool's max relative expiration"); + } catch (InvalidRequestException e) { + assertExceptionContains("is too far in the future", e); + } + // Test that the max is enforced on modify correctly when changing pools + CachePoolInfo destPool = new CachePoolInfo("destPool"); + dfs.addCachePool(destPool.setMaxRelativeExpiryMs(poolExpiration / 2)); + try { + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry) + .setId(listInfo.getId()) + .setPool(destPool.getPoolName()) + .build()); + fail("Modified a directive to a pool with a lower max expiration"); + } catch (InvalidRequestException e) { + assertExceptionContains("exceeds the max relative expiration", e); + } + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry) + .setId(listInfo.getId()) + .setPool(destPool.getPoolName()) + .setExpiration(Expiration.newRelative(poolExpiration / 2)) + .build()); + dirIt = dfs.listCacheDirectives(new CacheDirectiveInfo.Builder() + .setPool(destPool.getPoolName()) + .build()); + listInfo = dirIt.next().getInfo(); + listExpiration = listInfo.getExpiration().getAbsoluteMillis() + - new Date().getTime(); + assertTrue("Unexpected relative expiry " + listExpiration + + " expected approximately " + poolExpiration/2, + Math.abs(poolExpiration/2 - listExpiration) < 10*1000); + // Test that cache pool and directive expiry can be modified back to never + dfs.modifyCachePool(destPool + .setMaxRelativeExpiryMs(CachePoolInfo.RELATIVE_EXPIRY_NEVER)); + poolIt = dfs.listCachePools(); + listPool = poolIt.next().getInfo(); + while (!listPool.getPoolName().equals(destPool.getPoolName())) { + listPool = poolIt.next().getInfo(); + } + assertEquals("Expected max relative expiry to match set value", + CachePoolInfo.RELATIVE_EXPIRY_NEVER, + listPool.getMaxRelativeExpiryMs().longValue()); + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder() + .setId(listInfo.getId()) + .setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER)) + .build()); + // Test modifying close to the limit + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder() + .setId(listInfo.getId()) + .setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER - 1)) + .build()); + } + + @Test(timeout=60000) + public void testExceedsCapacity() throws Exception { + // Create a giant file + final Path fileName = new Path("/exceeds"); + final long fileLen = CACHE_CAPACITY * (NUM_DATANODES*2); + int numCachedReplicas = (int) ((CACHE_CAPACITY*NUM_DATANODES)/BLOCK_SIZE); + DFSTestUtil.createFile(dfs, fileName, fileLen, (short) NUM_DATANODES, + 0xFADED); + // Set up a log appender watcher + final LogVerificationAppender appender = new LogVerificationAppender(); + final Logger logger = Logger.getRootLogger(); + logger.addAppender(appender); + dfs.addCachePool(new CachePoolInfo("pool")); + dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool") + .setPath(fileName).setReplication((short) 1).build()); + waitForCachedBlocks(namenode, -1, numCachedReplicas, + "testExceeds:1"); + // Check that no DNs saw an excess CACHE message + int lines = appender.countLinesWithMessage( + "more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY); + assertEquals("Namenode should not send extra CACHE commands", 0, lines); + // Try creating a file with giant-sized blocks that exceed cache capacity + dfs.delete(fileName, false); + DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2, + (short) 1, 0xFADED); + // Nothing will get cached, so just force sleep for a bit + Thread.sleep(4000); + // Still should not see any excess commands + lines = appender.countLinesWithMessage( + "more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY); + assertEquals("Namenode should not send extra CACHE commands", 0, lines); + } +} Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1560522&r1=1560521&r2=1560522&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java Wed Jan 22 21:43:00 2014 @@ -143,7 +143,8 @@ public class TestDeadDatanode { StorageReport[] rep = { new StorageReport( new DatanodeStorage(reg.getDatanodeUuid()), false, 0, 0, 0, 0) }; - DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0).getCommands(); + DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0) + .getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction()); Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java?rev=1560522&r1=1560521&r2=1560522&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java Wed Jan 22 21:43:00 2014 @@ -31,7 +31,10 @@ import javax.management.ObjectName; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator; import org.apache.hadoop.util.VersionInfo; import org.junit.Test; import org.mortbay.util.ajax.JSON; @@ -46,10 +49,16 @@ public class TestNameNodeMXBean { */ private static final double DELTA = 0.000001; + static { + NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); + } + @SuppressWarnings({ "unchecked" }) @Test public void testNameNodeMXBeanInfo() throws Exception { Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + NativeIO.POSIX.getCacheManipulator().getMemlockLimit()); MiniDFSCluster cluster = null; try { @@ -171,6 +180,10 @@ public class TestNameNodeMXBean { } assertEquals(1, statusMap.get("active").size()); assertEquals(1, statusMap.get("failed").size()); + assertEquals(0L, mbs.getAttribute(mxbeanName, "CacheUsed")); + assertEquals(NativeIO.POSIX.getCacheManipulator().getMemlockLimit() * + cluster.getDataNodes().size(), + mbs.getAttribute(mxbeanName, "CacheCapacity")); } finally { if (cluster != null) { for (URI dir : cluster.getNameDirs(0)) { Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java?rev=1560522&r1=1560521&r2=1560522&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java Wed Jan 22 21:43:00 2014 @@ -414,7 +414,7 @@ public class TestNamenodeRetryCache { LightWeightCache cacheSet = (LightWeightCache) namesystem.getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(20, cacheSet.size()); Map oldEntries = new HashMap(); @@ -433,7 +433,7 @@ public class TestNamenodeRetryCache { assertTrue(namesystem.hasRetryCache()); cacheSet = (LightWeightCache) namesystem .getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(20, cacheSet.size()); iter = cacheSet.iterator(); while (iter.hasNext()) { CacheEntry entry = iter.next(); Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java?rev=1560522&r1=1560521&r2=1560522&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java Wed Jan 22 21:43:00 2014 @@ -27,6 +27,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; +import java.util.LinkedList; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -59,6 +60,8 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import com.google.common.util.concurrent.Uninterruptibles; + /** * Tests state transition from active->standby, and manual failover * and failback between two namenodes. @@ -124,6 +127,17 @@ public class TestHAStateTransitions { } } + private void addCrmThreads(MiniDFSCluster cluster, + LinkedList crmThreads) { + for (int nn = 0; nn <= 1; nn++) { + Thread thread = cluster.getNameNode(nn).getNamesystem(). + getCacheManager().getCacheReplicationMonitor(); + if (thread != null) { + crmThreads.add(thread); + } + } + } + /** * Test that transitioning a service to the state that it is already * in is a nop, specifically, an exception is not thrown. @@ -131,19 +145,30 @@ public class TestHAStateTransitions { @Test public void testTransitionToCurrentStateIsANop() throws Exception { Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(1) .build(); + LinkedList crmThreads = new LinkedList(); try { cluster.waitActive(); + addCrmThreads(cluster, crmThreads); cluster.transitionToActive(0); + addCrmThreads(cluster, crmThreads); cluster.transitionToActive(0); + addCrmThreads(cluster, crmThreads); cluster.transitionToStandby(0); + addCrmThreads(cluster, crmThreads); cluster.transitionToStandby(0); + addCrmThreads(cluster, crmThreads); } finally { cluster.shutdown(); } + // Verify that all cacheReplicationMonitor threads shut down + for (Thread thread : crmThreads) { + Uninterruptibles.joinUninterruptibly(thread); + } } /** Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java?rev=1560522&r1=1560521&r2=1560522&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java Wed Jan 22 21:43:00 2014 @@ -29,6 +29,7 @@ import java.net.URI; import java.net.UnknownHostException; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Random; @@ -37,11 +38,13 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -53,12 +56,16 @@ import org.apache.hadoop.hdfs.MiniDFSNNT import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INodeFile; @@ -82,6 +89,7 @@ public class TestRetryCacheWithHA { private static final int BlockSize = 1024; private static final short DataNodes = 3; private static final int CHECKTIMES = 10; + private static final int ResponseSize = 3; private MiniDFSCluster cluster; private DistributedFileSystem dfs; @@ -116,6 +124,8 @@ public class TestRetryCacheWithHA { @Before public void setup() throws Exception { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, ResponseSize); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, ResponseSize); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(DataNodes).build(); @@ -147,7 +157,7 @@ public class TestRetryCacheWithHA { FSNamesystem fsn0 = cluster.getNamesystem(0); LightWeightCache cacheSet = (LightWeightCache) fsn0.getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(20, cacheSet.size()); Map oldEntries = new HashMap(); @@ -168,7 +178,7 @@ public class TestRetryCacheWithHA { FSNamesystem fsn1 = cluster.getNamesystem(1); cacheSet = (LightWeightCache) fsn1 .getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(20, cacheSet.size()); iter = cacheSet.iterator(); while (iter.hasNext()) { CacheEntry entry = iter.next(); @@ -734,6 +744,263 @@ public class TestRetryCacheWithHA { } } + /** addCacheDirective */ + class AddCacheDirectiveInfoOp extends AtMostOnceOp { + private CacheDirectiveInfo directive; + private Long result; + + AddCacheDirectiveInfoOp(DFSClient client, + CacheDirectiveInfo directive) { + super("addCacheDirective", client); + this.directive = directive; + } + + @Override + void prepare() throws Exception { + dfs.addCachePool(new CachePoolInfo(directive.getPool())); + } + + @Override + void invoke() throws Exception { + result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = + dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder(). + setPool(directive.getPool()). + setPath(directive.getPath()). + build()); + if (iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return result; + } + } + + /** modifyCacheDirective */ + class ModifyCacheDirectiveInfoOp extends AtMostOnceOp { + private final CacheDirectiveInfo directive; + private final short newReplication; + private long id; + + ModifyCacheDirectiveInfoOp(DFSClient client, + CacheDirectiveInfo directive, short newReplication) { + super("modifyCacheDirective", client); + this.directive = directive; + this.newReplication = newReplication; + } + + @Override + void prepare() throws Exception { + dfs.addCachePool(new CachePoolInfo(directive.getPool())); + id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); + } + + @Override + void invoke() throws Exception { + client.modifyCacheDirective( + new CacheDirectiveInfo.Builder(). + setId(id). + setReplication(newReplication). + build(), EnumSet.of(CacheFlag.FORCE)); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = + dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder(). + setPool(directive.getPool()). + setPath(directive.getPath()). + build()); + while (iter.hasNext()) { + CacheDirectiveInfo result = iter.next().getInfo(); + if ((result.getId() == id) && + (result.getReplication().shortValue() == newReplication)) { + return true; + } + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** removeCacheDirective */ + class RemoveCacheDirectiveInfoOp extends AtMostOnceOp { + private CacheDirectiveInfo directive; + private long id; + + RemoveCacheDirectiveInfoOp(DFSClient client, String pool, + String path) { + super("removeCacheDirective", client); + this.directive = new CacheDirectiveInfo.Builder(). + setPool(pool). + setPath(new Path(path)). + build(); + } + + @Override + void prepare() throws Exception { + dfs.addCachePool(new CachePoolInfo(directive.getPool())); + id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); + } + + @Override + void invoke() throws Exception { + client.removeCacheDirective(id); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = + dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder(). + setPool(directive.getPool()). + setPath(directive.getPath()). + build()); + if (!iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** addCachePool */ + class AddCachePoolOp extends AtMostOnceOp { + private String pool; + + AddCachePoolOp(DFSClient client, String pool) { + super("addCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + } + + @Override + void invoke() throws Exception { + client.addCachePool(new CachePoolInfo(pool)); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** modifyCachePool */ + class ModifyCachePoolOp extends AtMostOnceOp { + String pool; + + ModifyCachePoolOp(DFSClient client, String pool) { + super("modifyCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + client.addCachePool(new CachePoolInfo(pool).setLimit(10l)); + } + + @Override + void invoke() throws Exception { + client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l)); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (iter.hasNext() && (long)iter.next().getInfo().getLimit() == 99) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** removeCachePool */ + class RemoveCachePoolOp extends AtMostOnceOp { + private String pool; + + RemoveCachePoolOp(DFSClient client, String pool) { + super("removeCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + client.addCachePool(new CachePoolInfo(pool)); + } + + @Override + void invoke() throws Exception { + client.removeCachePool(pool); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (!iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + @Test (timeout=60000) public void testCreateSnapshot() throws Exception { final DFSClient client = genClientWithDummyHandler(); @@ -811,6 +1078,58 @@ public class TestRetryCacheWithHA { testClientRetryWithFailover(op); } + @Test (timeout=60000) + public void testAddCacheDirectiveInfo() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new AddCacheDirectiveInfoOp(client, + new CacheDirectiveInfo.Builder(). + setPool("pool"). + setPath(new Path("/path")). + build()); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testModifyCacheDirectiveInfo() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new ModifyCacheDirectiveInfoOp(client, + new CacheDirectiveInfo.Builder(). + setPool("pool"). + setPath(new Path("/path")). + setReplication((short)1).build(), + (short)555); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testRemoveCacheDescriptor() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new RemoveCacheDirectiveInfoOp(client, "pool", + "/path"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testAddCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new AddCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testModifyCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new ModifyCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testRemoveCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new RemoveCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + /** * When NN failover happens, if the client did not receive the response and * send a retry request to the other NN, the same response should be recieved @@ -863,4 +1182,92 @@ public class TestRetryCacheWithHA { + results.get(op.name)); } } + + /** + * Add a list of cache pools, list cache pools, + * switch active NN, and list cache pools again. + */ + @Test (timeout=60000) + public void testListCachePools() throws Exception { + final int poolCount = 7; + HashSet poolNames = new HashSet(poolCount); + for (int i=0; i poolNames = new HashSet(poolCount); + Path path = new Path("/p"); + for (int i=0; i poolNames, int active) throws Exception { + HashSet tmpNames = (HashSet)poolNames.clone(); + RemoteIterator pools = dfs.listCachePools(); + int poolCount = poolNames.size(); + for (int i=0; i poolNames, int active) throws Exception { + HashSet tmpNames = (HashSet)poolNames.clone(); + RemoteIterator directives = dfs.listCacheDirectives(null); + int poolCount = poolNames.size(); + for (int i=0; i