hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1560522 [7/9] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/m...
Date Wed, 22 Jan 2014 21:43:04 GMT
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm?rev=1560522&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm Wed Jan 22 21:43:00 2014
@@ -0,0 +1,301 @@
+~~ Licensed under the Apache License, Version 2.0 (the "License");
+~~ you may not use this file except in compliance with the License.
+~~ You may obtain a copy of the License at
+~~
+~~   http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License. See accompanying LICENSE file.
+
+  ---
+  Hadoop Distributed File System-${project.version} - Centralized Cache Management in HDFS
+  ---
+  ---
+  ${maven.build.timestamp}
+
+Centralized Cache Management in HDFS
+
+  \[ {{{./index.html}Go Back}} \]
+
+%{toc|section=1|fromDepth=2|toDepth=4}
+
+* {Background}
+
+  Normally, HDFS relies on the operating system to cache data it reads from disk.
+  However, HDFS can also be configured to use centralized cache management. Under
+  centralized cache management, the HDFS NameNode itself decides which blocks
+  should be cached, and where they should be cached.
+
+  Centralized cache management has several advantages. First of all, it
+  prevents frequently used block files from being evicted from memory. This is
+  particularly important when the size of the working set exceeds the size of
+  main memory, which is true for many big data applications. Secondly, when
+  HDFS decides what should be cached, it can let clients know about this
+  information through the getFileBlockLocations API. Finally, when the DataNode
+  knows a block is locked into memory, it can provide access to that block via
+  mmap.
+
+* {Use Cases}
+
+  Centralized cache management is most useful for files which are accessed very
+  often. For example, a "fact table" in Hive which is often used in joins is a
+  good candidate for caching. On the other hand, when running a classic
+  "word count" MapReduce job which counts the number of words in each
+  document, there may not be any good candidates for caching, since all the
+  files may be accessed exactly once.
+
+* {Architecture}
+
+[images/caching.png] Caching Architecture
+
+  With centralized cache management, the NameNode coordinates all caching
+  across the cluster. It receives cache information from each DataNode via the
+  cache report, a periodic message that describes all the blocks IDs cached on
+  a given DataNode. The NameNode will reply to DataNode heartbeat messages
+  with commands telling it which blocks to cache and which to uncache.
+
+  The NameNode stores a set of path cache directives, which tell it which files
+  to cache. The NameNode also stores a set of cache pools, which are groups of
+  cache directives.  These directives and pools are persisted to the edit log
+  and fsimage, and will be loaded if the cluster is restarted.
+
+  Periodically, the NameNode rescans the namespace, to see which blocks need to
+  be cached based on the current set of path cache directives. Rescans are also
+  triggered by relevant user actions, such as adding or removing a cache
+  directive or removing a cache pool.
+
+  Cache directives also may specific a numeric cache replication, which is the
+  number of replicas to cache.  This number may be equal to or smaller than the
+  file's block replication.  If multiple cache directives cover the same file
+  with different cache replication settings, then the highest cache replication
+  setting is applied.
+
+  We do not currently cache blocks which are under construction, corrupt, or
+  otherwise incomplete.  If a cache directive covers a symlink, the symlink
+  target is not cached.
+
+  Caching is currently done on a per-file basis, although we would like to add
+  block-level granularity in the future.
+
+* {Interface}
+
+  The NameNode stores a list of "cache directives."  These directives contain a
+  path as well as the number of times blocks in that path should be replicated.
+
+  Paths can be either directories or files. If the path specifies a file, that
+  file is cached. If the path specifies a directory, all the files in the
+  directory will be cached. However, this process is not recursive-- only the
+  direct children of the directory will be cached.
+
+** {hdfs cacheadmin Shell}
+
+  Path cache directives can be created by the <<<hdfs cacheadmin
+  -addDirective>>> command and removed via the <<<hdfs cacheadmin
+  -removeDirective>>> command. To list the current path cache directives, use
+  <<<hdfs cacheadmin -listDirectives>>>. Each path cache directive has a
+  unique 64-bit ID number which will not be reused if it is deleted.  To remove
+  all path cache directives with a specified path, use <<<hdfs cacheadmin
+  -removeDirectives>>>.
+
+  Directives are grouped into "cache pools."  Each cache pool gets a share of
+  the cluster's resources. Additionally, cache pools are used for
+  authentication. Cache pools have a mode, user, and group, similar to regular
+  files. The same authentication rules are applied as for normal files. So, for
+  example, if the mode is 0777, any user can add or remove directives from the
+  cache pool. If the mode is 0644, only the owner can write to the cache pool,
+  but anyone can read from it. And so forth.
+
+  Cache pools are identified by name. They can be created by the <<<hdfs
+  cacheAdmin -addPool>>> command, modified by the <<<hdfs cacheadmin
+  -modifyPool>>> command, and removed via the <<<hdfs cacheadmin
+  -removePool>>> command. To list the current cache pools, use <<<hdfs
+  cacheAdmin -listPools>>>
+
+*** {addDirective}
+
+  Usage: <<<hdfs cacheadmin -addDirective -path <path> -replication <replication> -pool <pool-name> >>>
+
+  Add a new cache directive.
+
+*--+--+
+\<path\> | A path to cache. The path can be a directory or a file.
+*--+--+
+\<replication\> | The cache replication factor to use. Defaults to 1.
+*--+--+
+\<pool-name\> | The pool to which the directive will be added. You must have write permission on the cache pool in order to add new directives.
+*--+--+
+
+*** {removeDirective}
+
+  Usage: <<<hdfs cacheadmin -removeDirective <id> >>>
+
+  Remove a cache directive.
+
+*--+--+
+\<id\> | The id of the cache directive to remove.  You must have write permission on the pool of the directive in order to remove it.  To see a list of cachedirective IDs, use the -listDirectives command.
+*--+--+
+
+*** {removeDirectives}
+
+  Usage: <<<hdfs cacheadmin -removeDirectives <path> >>>
+
+  Remove every cache directive with the specified path.
+
+*--+--+
+\<path\> | The path of the cache directives to remove.  You must have write permission on the pool of the directive in order to remove it.  To see a list of cache directives, use the -listDirectives command.
+*--+--+
+
+*** {listDirectives}
+
+  Usage: <<<hdfs cacheadmin -listDirectives [-path <path>] [-pool <pool>] >>>
+
+  List cache directives.
+
+*--+--+
+\<path\> | List only cache directives with this path. Note that if there is a cache directive for <path> in a cache pool that we don't have read access for, it will not be listed.
+*--+--+
+\<pool\> | List only path cache directives in that pool.
+*--+--+
+
+*** {addPool}
+
+  Usage: <<<hdfs cacheadmin -addPool <name> [-owner <owner>] [-group <group>] [-mode <mode>] [-weight <weight>] >>>
+
+  Add a new cache pool.
+
+*--+--+
+\<name\> | Name of the new pool.
+*--+--+
+\<owner\> | Username of the owner of the pool. Defaults to the current user.
+*--+--+
+\<group\> | Group of the pool. Defaults to the primary group name of the current user.
+*--+--+
+\<mode\> | UNIX-style permissions for the pool. Permissions are specified in octal, e.g. 0755. By default, this is set to 0755.
+*--+--+
+\<weight\> | Weight of the pool. This is a relative measure of the importance of the pool used during cache resource management. By default, it is set to 100.
+*--+--+
+
+*** {modifyPool}
+
+  Usage: <<<hdfs cacheadmin -modifyPool <name> [-owner <owner>] [-group <group>] [-mode <mode>] [-weight <weight>] >>>
+
+  Modifies the metadata of an existing cache pool.
+
+*--+--+
+\<name\> | Name of the pool to modify.
+*--+--+
+\<owner\> | Username of the owner of the pool.
+*--+--+
+\<group\> | Groupname of the group of the pool.
+*--+--+
+\<mode\> | Unix-style permissions of the pool in octal.
+*--+--+
+\<weight\> | Weight of the pool.
+*--+--+
+
+*** {removePool}
+
+  Usage: <<<hdfs cacheadmin -removePool <name> >>>
+
+  Remove a cache pool. This also uncaches paths associated with the pool.
+
+*--+--+
+\<name\> | Name of the cache pool to remove.
+*--+--+
+
+*** {listPools}
+
+  Usage: <<<hdfs cacheadmin -listPools [name] >>>
+
+  Display information about one or more cache pools, e.g. name, owner, group,
+  permissions, etc.
+
+*--+--+
+\<name\> | If specified, list only the named cache pool.
+*--+--+
+
+*** {help}
+
+  Usage: <<<hdfs cacheadmin -help <command-name> >>>
+
+  Get detailed help about a command.
+
+*--+--+
+\<command-name\> | The command for which to get detailed help. If no command is specified, print detailed help for all commands.
+*--+--+
+
+* {Configuration}
+
+** {Native Libraries}
+
+  In order to lock block files into memory, the DataNode relies on native JNI
+  code found in <<<libhadoop.so>>>. Be sure to
+  {{{../hadoop-common/NativeLibraries.html}enable JNI}} if you are using HDFS
+  centralized cache management.
+
+** {Configuration Properties}
+
+*** Required
+
+  Be sure to configure the following:
+
+  * dfs.datanode.max.locked.memory
+
+    The DataNode will treat this as the maximum amount of memory it can use for
+    its cache. When setting this value, please remember that you will need space
+    in memory for other things, such as the Java virtual machine (JVM) itself
+    and the operating system's page cache.
+
+*** Optional
+
+  The following properties are not required, but may be specified for tuning:
+
+  * dfs.namenode.path.based.cache.refresh.interval.ms
+
+    The NameNode will use this as the amount of milliseconds between subsequent
+    path cache rescans.  This calculates the blocks to cache and each DataNode
+    containing a replica of the block that should cache it.
+
+    By default, this parameter is set to 300000, which is five minutes.
+
+  * dfs.datanode.fsdatasetcache.max.threads.per.volume
+
+    The DataNode will use this as the maximum number of threads per volume to
+    use for caching new data.
+
+    By default, this parameter is set to 4.
+
+  * dfs.cachereport.intervalMsec
+
+    The DataNode will use this as the amount of milliseconds between sending a
+    full report of its cache state to the NameNode.
+
+    By default, this parameter is set to 10000, which is 10 seconds.
+
+  * dfs.namenode.path.based.cache.block.map.allocation.percent
+
+    The percentage of the Java heap which we will allocate to the cached blocks
+    map.  The cached blocks map is a hash map which uses chained hashing.
+    Smaller maps may be accessed more slowly if the number of cached blocks is
+    large; larger maps will consume more memory.  The default is 0.25 percent.
+
+** {OS Limits}
+
+  If you get the error "Cannot start datanode because the configured max
+  locked memory size... is more than the datanode's available RLIMIT_MEMLOCK
+  ulimit," that means that the operating system is imposing a lower limit
+  on the amount of memory that you can lock than what you have configured. To
+  fix this, you must adjust the ulimit -l value that the DataNode runs with.
+  Usually, this value is configured in <<</etc/security/limits.conf>>>.
+  However, it will vary depending on what operating system and distribution
+  you are using.
+
+  You will know that you have correctly configured this value when you can run
+  <<<ulimit -l>>> from the shell and get back either a higher value than what
+  you have configured with <<<dfs.datanode.max.locked.memory>>>, or the string
+  "unlimited," indicating that there is no limit.  Note that it's typical for
+  <<<ulimit -l>>> to output the memory lock limit in KB, but
+  dfs.datanode.max.locked.memory must be specified in bytes.

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/caching.png
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/caching.png?rev=1560522&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/caching.png
------------------------------------------------------------------------------
    svn:mime-type = image/png

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCacheAdminCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCacheAdminCLI.java?rev=1560522&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCacheAdminCLI.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCacheAdminCLI.java Wed Jan 22 21:43:00 2014
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cli;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.cli.util.CLICommand;
+import org.apache.hadoop.cli.util.CLICommandCacheAdmin;
+import org.apache.hadoop.cli.util.CLICommandTypes;
+import org.apache.hadoop.cli.util.CLITestCmd;
+import org.apache.hadoop.cli.util.CacheAdminCmdExecutor;
+import org.apache.hadoop.cli.util.CommandExecutor;
+import org.apache.hadoop.cli.util.CommandExecutor.Result;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.tools.CacheAdmin;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+public class TestCacheAdminCLI extends CLITestHelper {
+
+  public static final Log LOG = LogFactory.getLog(TestCacheAdminCLI.class);
+
+  protected MiniDFSCluster dfsCluster = null;
+  protected FileSystem fs = null;
+  protected String namenode = null;
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
+        HDFSPolicyProvider.class, PolicyProvider.class);
+
+    // Many of the tests expect a replication value of 1 in the output
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+
+    dfsCluster.waitClusterUp();
+    namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
+    username = System.getProperty("user.name");
+
+    fs = dfsCluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(),
+               fs instanceof DistributedFileSystem);
+  }
+
+  @After
+  @Override
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+    Thread.sleep(2000);
+    super.tearDown();
+  }
+
+  @Override
+  protected String getTestFile() {
+    return "testCacheAdminConf.xml";
+  }
+
+  @Override
+  protected TestConfigFileParser getConfigParser() {
+    return new TestConfigFileParserCacheAdmin();
+  }
+
+  private class TestConfigFileParserCacheAdmin extends
+      CLITestHelper.TestConfigFileParser {
+    @Override
+    public void endElement(String uri, String localName, String qName)
+        throws SAXException {
+      if (qName.equals("cache-admin-command")) {
+        if (testCommands != null) {
+          testCommands.add(new CLITestCmdCacheAdmin(charString,
+              new CLICommandCacheAdmin()));
+        } else if (cleanupCommands != null) {
+          cleanupCommands.add(new CLITestCmdCacheAdmin(charString,
+              new CLICommandCacheAdmin()));
+        }
+      } else {
+        super.endElement(uri, localName, qName);
+      }
+    }
+  }
+
+  private class CLITestCmdCacheAdmin extends CLITestCmd {
+
+    public CLITestCmdCacheAdmin(String str, CLICommandTypes type) {
+      super(str, type);
+    }
+
+    @Override
+    public CommandExecutor getExecutor(String tag)
+        throws IllegalArgumentException {
+      if (getType() instanceof CLICommandCacheAdmin) {
+        return new CacheAdminCmdExecutor(tag, new CacheAdmin(conf));
+      }
+      return super.getExecutor(tag);
+    }
+  }
+
+  @Override
+  protected Result execute(CLICommand cmd) throws Exception {
+    return cmd.getExecutor("").executeCommand(cmd.getCmd());
+  }
+
+  @Test
+  @Override
+  public void testAll () {
+    super.testAll();
+  }
+}

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCacheAdminCLI.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandCacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandCacheAdmin.java?rev=1560522&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandCacheAdmin.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandCacheAdmin.java Wed Jan 22 21:43:00 2014
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli.util;
+
+public class CLICommandCacheAdmin implements CLICommandTypes {
+}

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandCacheAdmin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CacheAdminCmdExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CacheAdminCmdExecutor.java?rev=1560522&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CacheAdminCmdExecutor.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CacheAdminCmdExecutor.java Wed Jan 22 21:43:00 2014
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli.util;
+
+import org.apache.hadoop.hdfs.tools.CacheAdmin;
+import org.apache.hadoop.util.ToolRunner;
+
+public class CacheAdminCmdExecutor extends CommandExecutor {
+  protected String namenode = null;
+  protected CacheAdmin admin = null;
+
+  public CacheAdminCmdExecutor(String namenode, CacheAdmin admin) {
+    this.namenode = namenode;
+    this.admin = admin;
+  }
+
+  @Override
+  protected void execute(final String cmd) throws Exception {
+    String[] args = getCommandAsArgs(cmd, "NAMENODE", this.namenode);
+    ToolRunner.run(admin, args);
+  }
+}

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CacheAdminCmdExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Jan 22 21:43:00 2014
@@ -850,7 +850,7 @@ public class DFSTestUtil {
         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
-        1, 2, 3, 4, 5, 6, "local", adminState);
+        1l, 2l, 3l, 4l, 0l, 0l, 5, 6, "local", adminState);
   }
 
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
@@ -1060,6 +1060,27 @@ public class DFSTestUtil {
       locatedBlocks = DFSClientAdapter.callGetBlockLocations(
           cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
     } while (locatedBlocks.isUnderConstruction());
+    // OP_ADD_CACHE_POOL
+    filesystem.addCachePool(new CachePoolInfo("pool1"));
+    // OP_MODIFY_CACHE_POOL
+    filesystem.modifyCachePool(new CachePoolInfo("pool1").setLimit(99l));
+    // OP_ADD_PATH_BASED_CACHE_DIRECTIVE
+    long id = filesystem.addCacheDirective(
+        new CacheDirectiveInfo.Builder().
+            setPath(new Path("/path")).
+            setReplication((short)1).
+            setPool("pool1").
+            build(), EnumSet.of(CacheFlag.FORCE));
+    // OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE
+    filesystem.modifyCacheDirective(
+        new CacheDirectiveInfo.Builder().
+            setId(id).
+            setReplication((short)2).
+            build(), EnumSet.of(CacheFlag.FORCE));
+    // OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE
+    filesystem.removeCacheDirective(id);
+    // OP_REMOVE_CACHE_POOL
+    filesystem.removeCachePool("pool1");
   }
 
   public static void abortStream(DFSOutputStream out) throws IOException {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java Wed Jan 22 21:43:00 2014
@@ -61,4 +61,15 @@ public class LogVerificationAppender ext
     }
     return count;
   }
+
+  public int countLinesWithMessage(final String text) {
+    int count = 0;
+    for (LoggingEvent e: getLog()) {
+      String msg = e.getRenderedMessage();
+      if (msg != null && msg.contains(text)) {
+        count++;
+      }
+    }
+    return count;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java Wed Jan 22 21:43:00 2014
@@ -30,6 +30,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -725,4 +726,42 @@ public class TestDFSUtil {
         DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY,
         DFSUtil.getSpnegoKeytabKey(conf, defaultKey));
   }
+
+  @Test(timeout=1000)
+  public void testDurationToString() throws Exception {
+    assertEquals("000:00:00:00.000", DFSUtil.durationToString(0));
+    assertEquals("001:01:01:01.000",
+        DFSUtil.durationToString(((24*60*60)+(60*60)+(60)+1)*1000));
+    assertEquals("000:23:59:59.999",
+        DFSUtil.durationToString(((23*60*60)+(59*60)+(59))*1000+999));
+    assertEquals("-001:01:01:01.000",
+        DFSUtil.durationToString(-((24*60*60)+(60*60)+(60)+1)*1000));
+    assertEquals("-000:23:59:59.574",
+        DFSUtil.durationToString(-(((23*60*60)+(59*60)+(59))*1000+574)));
+  }
+
+  @Test(timeout=5000)
+  public void testRelativeTimeConversion() throws Exception {
+    try {
+      DFSUtil.parseRelativeTime("1");
+    } catch (IOException e) {
+      assertExceptionContains("too short", e);
+    }
+    try {
+      DFSUtil.parseRelativeTime("1z");
+    } catch (IOException e) {
+      assertExceptionContains("unknown time unit", e);
+    }
+    try {
+      DFSUtil.parseRelativeTime("yyz");
+    } catch (IOException e) {
+      assertExceptionContains("is not a number", e);
+    }
+    assertEquals(61*1000, DFSUtil.parseRelativeTime("61s"));
+    assertEquals(61*60*1000, DFSUtil.parseRelativeTime("61m"));
+    assertEquals(0, DFSUtil.parseRelativeTime("0s"));
+    assertEquals(25*60*60*1000, DFSUtil.parseRelativeTime("25h"));
+    assertEquals(4*24*60*60*1000l, DFSUtil.parseRelativeTime("4d"));
+    assertEquals(999*24*60*60*1000l, DFSUtil.parseRelativeTime("999d"));
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java Wed Jan 22 21:43:00 2014
@@ -32,6 +32,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -109,4 +111,39 @@ public class TestDatanodeConfig {
       throw new IOException("Bad URI", e);
     }
   }
+
+  @Test(timeout=60000)
+  public void testMemlockLimit() throws Exception {
+    assumeTrue(NativeIO.isAvailable());
+    final long memlockLimit =
+        NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
+
+    // Can't increase the memlock limit past the maximum.
+    assumeTrue(memlockLimit != Long.MAX_VALUE);
+
+    Configuration conf = cluster.getConfiguration(0);
+    long prevLimit = conf.
+        getLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+            DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
+    try {
+      // Try starting the DN with limit configured to the ulimit
+      conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          memlockLimit);
+      DataNode dn = null;
+      dn = DataNode.createDataNode(new String[]{},  conf);
+      dn.shutdown();
+      // Try starting the DN with a limit > ulimit
+      conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          memlockLimit+1);
+      try {
+        dn = DataNode.createDataNode(new String[]{}, conf);
+      } catch (RuntimeException e) {
+        GenericTestUtils.assertExceptionContains(
+            "more than the datanode's available RLIMIT_MEMLOCK", e);
+      }
+    } finally {
+      conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          prevLimit);
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Wed Jan 22 21:43:00 2014
@@ -440,7 +440,7 @@ public class TestPBHelper {
     };
     LocatedBlock lb = new LocatedBlock(
         new ExtendedBlock("bp12", 12345, 10, 53),
-        dnInfos, storageIDs, media, 5, false);
+        dnInfos, storageIDs, media, 5, false, new DatanodeInfo[]{});
     lb.setBlockToken(new Token<BlockTokenIdentifier>(
         "identifier".getBytes(), "password".getBytes(), new Text("kind"),
         new Text("service")));

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Wed Jan 22 21:43:00 2014
@@ -107,7 +107,7 @@ public class TestBlockManager {
           2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
       dn.updateHeartbeat(
-          BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0, 0);
+          BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0);
       bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
     }
   }

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java?rev=1560522&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java Wed Jan 22 21:43:00 2014
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCachedBlocksList {
+  public static final Log LOG = LogFactory.getLog(TestCachedBlocksList.class);
+
+  @Test(timeout=60000)
+  public void testSingleList() {
+    DatanodeDescriptor dn = new DatanodeDescriptor(
+      new DatanodeID("127.0.0.1", "localhost", "abcd",
+        5000, 5001, 5002, 5003));
+    CachedBlock[] blocks = new CachedBlock[] {
+          new CachedBlock(0L, (short)1, true),
+          new CachedBlock(1L, (short)1, true),
+          new CachedBlock(2L, (short)1, true),
+      };
+    // check that lists are empty
+    Assert.assertTrue("expected pending cached list to start off empty.", 
+        !dn.getPendingCached().iterator().hasNext());
+    Assert.assertTrue("expected cached list to start off empty.", 
+        !dn.getCached().iterator().hasNext());
+    Assert.assertTrue("expected pending uncached list to start off empty.", 
+        !dn.getPendingUncached().iterator().hasNext());
+    // add a block to the back
+    Assert.assertTrue(dn.getCached().add(blocks[0]));
+    Assert.assertTrue("expected pending cached list to still be empty.", 
+        !dn.getPendingCached().iterator().hasNext());
+    Assert.assertEquals("failed to insert blocks[0]", blocks[0],
+        dn.getCached().iterator().next());
+    Assert.assertTrue("expected pending uncached list to still be empty.", 
+        !dn.getPendingUncached().iterator().hasNext());
+    // add another block to the back
+    Assert.assertTrue(dn.getCached().add(blocks[1]));
+    Iterator<CachedBlock> iter = dn.getCached().iterator();
+    Assert.assertEquals(blocks[0], iter.next());
+    Assert.assertEquals(blocks[1], iter.next());
+    Assert.assertTrue(!iter.hasNext());
+    // add a block to the front
+    Assert.assertTrue(dn.getCached().addFirst(blocks[2]));
+    iter = dn.getCached().iterator();
+    Assert.assertEquals(blocks[2], iter.next());
+    Assert.assertEquals(blocks[0], iter.next());
+    Assert.assertEquals(blocks[1], iter.next());
+    Assert.assertTrue(!iter.hasNext());
+    // remove a block from the middle
+    Assert.assertTrue(dn.getCached().remove(blocks[0]));
+    iter = dn.getCached().iterator();
+    Assert.assertEquals(blocks[2], iter.next());
+    Assert.assertEquals(blocks[1], iter.next());
+    Assert.assertTrue(!iter.hasNext());
+    // remove all blocks
+    dn.getCached().clear();
+    Assert.assertTrue("expected cached list to be empty after clear.", 
+        !dn.getPendingCached().iterator().hasNext());
+  }
+
+  private void testAddElementsToList(CachedBlocksList list,
+      CachedBlock[] blocks) {
+    Assert.assertTrue("expected list to start off empty.", 
+        !list.iterator().hasNext());
+    for (CachedBlock block : blocks) {
+      Assert.assertTrue(list.add(block));
+    }
+  }
+
+  private void testRemoveElementsFromList(Random r,
+      CachedBlocksList list, CachedBlock[] blocks) {
+    int i = 0;
+    for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext(); ) {
+      Assert.assertEquals(blocks[i], iter.next());
+      i++;
+    }
+    if (r.nextBoolean()) {
+      LOG.info("Removing via iterator");
+      for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext() ;) {
+        iter.next();
+        iter.remove();
+      }
+    } else {
+      LOG.info("Removing in pseudo-random order");
+      CachedBlock[] remainingBlocks = Arrays.copyOf(blocks, blocks.length);
+      for (int removed = 0; removed < remainingBlocks.length; ) {
+        int toRemove = r.nextInt(remainingBlocks.length);
+        if (remainingBlocks[toRemove] != null) {
+          Assert.assertTrue(list.remove(remainingBlocks[toRemove]));
+          remainingBlocks[toRemove] = null;
+          removed++;
+        }
+      }
+    }
+    Assert.assertTrue("expected list to be empty after everything " +
+        "was removed.", !list.iterator().hasNext());
+  }
+
+  @Test(timeout=60000)
+  public void testMultipleLists() {
+    DatanodeDescriptor[] datanodes = new DatanodeDescriptor[] {
+      new DatanodeDescriptor(
+        new DatanodeID("127.0.0.1", "localhost", "abcd",
+          5000, 5001, 5002, 5003)),
+      new DatanodeDescriptor(
+        new DatanodeID("127.0.1.1", "localhost", "efgh",
+          6000, 6001, 6002, 6003)),
+    };
+    CachedBlocksList[] lists = new CachedBlocksList[] {
+        datanodes[0].getPendingCached(),
+        datanodes[0].getCached(),
+        datanodes[1].getPendingCached(),
+        datanodes[1].getCached(),
+        datanodes[1].getPendingUncached(),
+    };
+    final int NUM_BLOCKS = 8000;
+    CachedBlock[] blocks = new CachedBlock[NUM_BLOCKS];
+    for (int i = 0; i < NUM_BLOCKS; i++) {
+      blocks[i] = new CachedBlock(i, (short)i, true);
+    }
+    Random r = new Random(654);
+    for (CachedBlocksList list : lists) {
+      testAddElementsToList(list, blocks);
+    }
+    for (CachedBlocksList list : lists) {
+      testRemoveElementsFromList(r, list, blocks);
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java Wed Jan 22 21:43:00 2014
@@ -106,7 +106,7 @@ public class TestOverReplicatedBlocks {
               datanode.getStorageInfos()[0].setUtilizationForTesting(100L, 100L, 0, 100L);
               datanode.updateHeartbeat(
                   BlockManagerTestUtil.getStorageReportsForDatanode(datanode),
-                  0, 0);
+                  0L, 0L, 0, 0);
             }
           }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Wed Jan 22 21:43:00 2014
@@ -87,12 +87,12 @@ public class TestReplicationPolicy {
   
   private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
     long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-    int xceiverCount, int volFailures) {
+    long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) {
     dn.getStorageInfos()[0].setUtilizationForTesting(
         capacity, dfsUsed, remaining, blockPoolUsed);
     dn.updateHeartbeat(
         BlockManagerTestUtil.getStorageReportsForDatanode(dn),
-        xceiverCount, volFailures);
+        dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
   }
 
   @BeforeClass
@@ -133,7 +133,7 @@ public class TestReplicationPolicy {
     for (int i=0; i < NUM_OF_DATANODES; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }    
   }
 
@@ -157,7 +157,8 @@ public class TestReplicationPolicy {
   public void testChooseTarget1() throws Exception {
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        0L, 0L, 4, 0); // overloaded
 
     DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
@@ -187,7 +188,7 @@ public class TestReplicationPolicy {
     
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
   }
 
   private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
@@ -310,7 +311,8 @@ public class TestReplicationPolicy {
     // make data node 0 to be not qualified to choose
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
+        (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
+        0L, 0L, 0, 0); // no space
         
     DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
@@ -343,7 +345,7 @@ public class TestReplicationPolicy {
 
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
   }
   
   /**
@@ -360,7 +362,7 @@ public class TestReplicationPolicy {
     for(int i=0; i<2; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
       
     DatanodeStorageInfo[] targets;
@@ -388,7 +390,7 @@ public class TestReplicationPolicy {
     for(int i=0; i<2; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
   }
 
@@ -450,7 +452,7 @@ public class TestReplicationPolicy {
     for(int i=0; i<2; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
     
     final LogVerificationAppender appender = new LogVerificationAppender();
@@ -475,7 +477,7 @@ public class TestReplicationPolicy {
     for(int i=0; i<2; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java Wed Jan 22 21:43:00 2014
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -26,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,7 +47,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestReplicationPolicyWithNodeGroup extends TestCase {
+public class TestReplicationPolicyWithNodeGroup {
   private static final int BLOCK_SIZE = 1024;
   private static final int NUM_OF_DATANODES = 8;
   private static final int NUM_OF_DATANODES_BOUNDARY = 6;
@@ -145,19 +148,20 @@ public class TestReplicationPolicyWithNo
   
   private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      int xceiverCount, int volFailures) {
+      long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
+      int volFailures) {
     dn.getStorageInfos()[0].setUtilizationForTesting(
         capacity, dfsUsed, remaining, blockPoolUsed);
     dn.updateHeartbeat(
         BlockManagerTestUtil.getStorageReportsForDatanode(dn),
-        xceiverCount, volFailures);
+        dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
   }
 
   private static void setupDataNodeCapacity() {
     for(int i=0; i<NUM_OF_DATANODES; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
   }
   
@@ -236,10 +240,12 @@ public class TestReplicationPolicyWithNo
    * the 1st is on dataNodes[0] and the 2nd is on a different rack.
    * @throws Exception
    */
+  @Test
   public void testChooseTarget1() throws Exception {
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        0L, 0L, 4, 0); // overloaded
 
     DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
@@ -276,7 +282,7 @@ public class TestReplicationPolicyWithNo
 
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
   }
 
   private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) {
@@ -295,6 +301,7 @@ public class TestReplicationPolicyWithNo
    * node group, and the rest should be placed on a third rack.
    * @throws Exception
    */
+  @Test
   public void testChooseTarget2() throws Exception {
     DatanodeStorageInfo[] targets;
     BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
@@ -338,11 +345,13 @@ public class TestReplicationPolicyWithNo
    * and the rest should be placed on the third rack.
    * @throws Exception
    */
+  @Test
   public void testChooseTarget3() throws Exception {
     // make data node 0 to be not qualified to choose
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
+        (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
+        0L, 0L, 0, 0); // no space
 
     DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
@@ -373,7 +382,7 @@ public class TestReplicationPolicyWithNo
 
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
   }
 
   /**
@@ -385,12 +394,13 @@ public class TestReplicationPolicyWithNo
    * in different node group.
    * @throws Exception
    */
+  @Test
   public void testChooseTarget4() throws Exception {
     // make data node 0-2 to be not qualified to choose: not enough disk space
     for(int i=0; i<3; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
 
     DatanodeStorageInfo[] targets;
@@ -424,6 +434,7 @@ public class TestReplicationPolicyWithNo
    * the 3rd replica should be placed on the same rack as the 2nd replica,
    * @throws Exception
    */
+  @Test
   public void testChooseTarget5() throws Exception {
     setupDataNodeCapacity();
     DatanodeStorageInfo[] targets;
@@ -451,6 +462,7 @@ public class TestReplicationPolicyWithNo
    * the 1st replica. The 3rd replica can be placed randomly.
    * @throws Exception
    */
+  @Test
   public void testRereplicate1() throws Exception {
     setupDataNodeCapacity();
     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
@@ -483,6 +495,7 @@ public class TestReplicationPolicyWithNo
    * the rest replicas can be placed randomly,
    * @throws Exception
    */
+  @Test
   public void testRereplicate2() throws Exception {
     setupDataNodeCapacity();
     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
@@ -510,6 +523,7 @@ public class TestReplicationPolicyWithNo
    * the rest replicas can be placed randomly,
    * @throws Exception
    */
+  @Test
   public void testRereplicate3() throws Exception {
     setupDataNodeCapacity();
     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
@@ -615,11 +629,11 @@ public class TestReplicationPolicyWithNo
       updateHeartbeatWithUsage(dataNodes[0],
                 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
                 (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE,
-                0L, 0, 0);
+                0L, 0L, 0L, 0, 0);
 
       updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
 
     DatanodeStorageInfo[] targets;
@@ -650,7 +664,7 @@ public class TestReplicationPolicyWithNo
     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
       updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
     chosenNodes.add(storagesInBoundaryCase[0]);
@@ -688,7 +702,7 @@ public class TestReplicationPolicyWithNo
     for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
       updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
 
     DatanodeStorageInfo[] targets;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java Wed Jan 22 21:43:00 2014
@@ -477,8 +477,8 @@ public class TestJspHelper {
     StorageReport[] report2 = new StorageReport[] {
         new StorageReport(dns2, false, 2500, 200, 1848, 200)
     };
-    dnDesc1.updateHeartbeat(report1, 10, 2);
-    dnDesc2.updateHeartbeat(report2, 20, 1);
+    dnDesc1.updateHeartbeat(report1, 5L, 3L, 10, 2);
+    dnDesc2.updateHeartbeat(report2, 10L, 2L, 20, 1);
 
     ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
     live.add(dnDesc1);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Jan 22 21:43:00 2014
@@ -18,13 +18,13 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -491,6 +491,11 @@ public class SimulatedFSDataset implemen
     return Collections.singletonMap(new DatanodeStorage(storage.storageUuid), getBlockReport(bpid));
   }
 
+  @Override // FsDatasetSpi
+  public List<Long> getCacheReport(String bpid) {
+    return new LinkedList<Long>();
+  }
+
   @Override // FSDatasetMBean
   public long getCapacity() {
     return storage.getCapacity();
@@ -516,6 +521,31 @@ public class SimulatedFSDataset implemen
     return storage.getNumFailedVolumes();
   }
 
+  @Override // FSDatasetMBean
+  public long getCacheUsed() {
+    return 0l;
+  }
+
+  @Override // FSDatasetMBean
+  public long getCacheCapacity() {
+    return 0l;
+  }
+
+  @Override // FSDatasetMBean
+  public long getNumBlocksCached() {
+    return 0l;
+  }
+
+  @Override
+  public long getNumBlocksFailedToCache() {
+    return 0l;
+  }
+
+  @Override
+  public long getNumBlocksFailedToUncache() {
+    return 0l;
+  }
+
   @Override // FsDatasetSpi
   public synchronized long getLength(ExtendedBlock b) throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
@@ -585,6 +615,18 @@ public class SimulatedFSDataset implemen
     }
   }
 
+  @Override // FSDatasetSpi
+  public void cache(String bpid, long[] cacheBlks) {
+    throw new UnsupportedOperationException(
+        "SimulatedFSDataset does not support cache operation!");
+  }
+
+  @Override // FSDatasetSpi
+  public void uncache(String bpid, long[] uncacheBlks) {
+    throw new UnsupportedOperationException(
+        "SimulatedFSDataset does not support uncache operation!");
+  }
+
   private BInfo getBInfo(final ExtendedBlock b) {
     final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
     return map == null? null: map.get(b.getLocalBlock());

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java Wed Jan 22 21:43:00 2014
@@ -128,6 +128,8 @@ public class TestBPOfferService {
       .when(mock).sendHeartbeat(
           Mockito.any(DatanodeRegistration.class),
           Mockito.any(StorageReport[].class),
+          Mockito.anyLong(),
+          Mockito.anyLong(),
           Mockito.anyInt(),
           Mockito.anyInt(),
           Mockito.anyInt());

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Wed Jan 22 21:43:00 2014
@@ -154,6 +154,8 @@ public class TestBlockRecovery {
     when(namenode.sendHeartbeat(
             Mockito.any(DatanodeRegistration.class),
             Mockito.any(StorageReport[].class),
+            Mockito.anyLong(),
+            Mockito.anyLong(),
             Mockito.anyInt(),
             Mockito.anyInt(),
             Mockito.anyInt()))

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java Wed Jan 22 21:43:00 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.FileDescriptor;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
@@ -36,7 +37,8 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -54,7 +56,7 @@ public class TestCachingStrategy {
     EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
 
     // Track calls to posix_fadvise.
-    NativeIO.POSIX.cacheTracker = tracker;
+    NativeIO.POSIX.setCacheManipulator(tracker);
     
     // Normally, we wait for a few megabytes of data to be read or written 
     // before dropping the cache.  This is to avoid an excessive number of
@@ -106,12 +108,13 @@ public class TestCachingStrategy {
     }
   }
 
-  private static class TestRecordingCacheTracker implements CacheTracker {
+  private static class TestRecordingCacheTracker extends CacheManipulator {
     private final Map<String, Stats> map = new TreeMap<String, Stats>();
 
     @Override
-    synchronized public void fadvise(String name,
-        long offset, long len, int flags) {
+    public void posixFadviseIfPossible(String name,
+      FileDescriptor fd, long offset, long len, int flags)
+          throws NativeIOException {
       if ((len < 0) || (len > Integer.MAX_VALUE)) {
         throw new RuntimeException("invalid length of " + len +
             " passed to posixFadviseIfPossible");
@@ -126,6 +129,7 @@ public class TestCachingStrategy {
         map.put(name, stats);
       }
       stats.fadvise((int)offset, (int)len, flags);
+      super.posixFadviseIfPossible(name, fd, offset, len, flags);
     }
 
     synchronized void clear() {

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1560522&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Wed Jan 22 21:43:00 2014
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import com.google.common.base.Supplier;
+
+public class TestFsDatasetCache {
+  private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
+
+  // Most Linux installs allow a default of 64KB locked memory
+  private static final long CACHE_CAPACITY = 64 * 1024;
+  // mlock always locks the entire page. So we don't need to deal with this
+  // rounding, use the OS page size for the block size.
+  private static final long PAGE_SIZE =
+      NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
+  private static final long BLOCK_SIZE = PAGE_SIZE;
+
+  private static Configuration conf;
+  private static MiniDFSCluster cluster = null;
+  private static FileSystem fs;
+  private static NameNode nn;
+  private static FSImage fsImage;
+  private static DataNode dn;
+  private static FsDatasetSpi<?> fsd;
+  private static DatanodeProtocolClientSideTranslatorPB spyNN;
+  private static PageRounder rounder = new PageRounder();
+  private static CacheManipulator prevCacheManipulator;
+
+  static {
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
+    LogManager.getLogger(FsDatasetCache.class).setLevel(Level.DEBUG);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    assumeTrue(!Path.WINDOWS);
+    conf = new HdfsConfiguration();
+    conf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
+    conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        CACHE_CAPACITY);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitActive();
+
+    fs = cluster.getFileSystem();
+    nn = cluster.getNameNode();
+    fsImage = nn.getFSImage();
+    dn = cluster.getDataNodes().get(0);
+    fsd = dn.getFSDataset();
+
+    spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    // Restore the original CacheManipulator
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
+  }
+
+  private static void setHeartbeatResponse(DatanodeCommand[] cmds)
+      throws IOException {
+    HeartbeatResponse response = new HeartbeatResponse(
+        cmds,
+        new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
+        fsImage.getLastAppliedOrWrittenTxId()));
+    doReturn(response).when(spyNN).sendHeartbeat(
+        (DatanodeRegistration) any(),
+        (StorageReport[]) any(), anyLong(), anyLong(),
+        anyInt(), anyInt(), anyInt());
+  }
+
+  private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
+    return cacheBlocks(new HdfsBlockLocation[] {loc});
+  }
+
+  private static DatanodeCommand[] cacheBlocks(HdfsBlockLocation[] locs) {
+    return new DatanodeCommand[] {
+        getResponse(locs, DatanodeProtocol.DNA_CACHE)
+    };
+  }
+
+  private static DatanodeCommand[] uncacheBlock(HdfsBlockLocation loc) {
+    return uncacheBlocks(new HdfsBlockLocation[] {loc});
+  }
+
+  private static DatanodeCommand[] uncacheBlocks(HdfsBlockLocation[] locs) {
+    return new DatanodeCommand[] {
+        getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
+    };
+  }
+
+  /**
+   * Creates a cache or uncache DatanodeCommand from an array of locations
+   */
+  private static DatanodeCommand getResponse(HdfsBlockLocation[] locs,
+      int action) {
+    String bpid = locs[0].getLocatedBlock().getBlock().getBlockPoolId();
+    long[] blocks = new long[locs.length];
+    for (int i=0; i<locs.length; i++) {
+      blocks[i] = locs[i].getLocatedBlock().getBlock().getBlockId();
+    }
+    return new BlockIdCommand(action, bpid, blocks);
+  }
+
+  private static long[] getBlockSizes(HdfsBlockLocation[] locs)
+      throws Exception {
+    long[] sizes = new long[locs.length];
+    for (int i=0; i<locs.length; i++) {
+      HdfsBlockLocation loc = locs[i];
+      String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
+      Block block = loc.getLocatedBlock().getBlock().getLocalBlock();
+      ExtendedBlock extBlock = new ExtendedBlock(bpid, block);
+      FileChannel blockChannel =
+          ((FileInputStream)fsd.getBlockInputStream(extBlock, 0)).getChannel();
+      sizes[i] = blockChannel.size();
+    }
+    return sizes;
+  }
+
+  /**
+   * Blocks until cache usage hits the expected new value.
+   */
+  private long verifyExpectedCacheUsage(final long expectedCacheUsed,
+      final long expectedBlocks) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      private int tries = 0;
+      
+      @Override
+      public Boolean get() {
+        long curCacheUsed = fsd.getCacheUsed();
+        long curBlocks = fsd.getNumBlocksCached();
+        if ((curCacheUsed != expectedCacheUsed) ||
+            (curBlocks != expectedBlocks)) {
+          if (tries++ > 10) {
+            LOG.info("verifyExpectedCacheUsage: have " +
+                curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
+                curBlocks + "/" + expectedBlocks + " blocks cached. " +
+                "memlock limit = " +
+                NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
+                ".  Waiting...");
+          }
+          return false;
+        }
+        return true;
+      }
+    }, 100, 60000);
+    return expectedCacheUsed;
+  }
+
+  private void testCacheAndUncacheBlock() throws Exception {
+    LOG.info("beginning testCacheAndUncacheBlock");
+    final int NUM_BLOCKS = 5;
+
+    verifyExpectedCacheUsage(0, 0);
+    assertEquals(0, fsd.getNumBlocksCached());
+
+    // Write a test file
+    final Path testFile = new Path("/testCacheBlock");
+    final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
+    DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 0xABBAl);
+
+    // Get the details of the written file
+    HdfsBlockLocation[] locs =
+        (HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0, testFileLen);
+    assertEquals("Unexpected number of blocks", NUM_BLOCKS, locs.length);
+    final long[] blockSizes = getBlockSizes(locs);
+
+    // Check initial state
+    final long cacheCapacity = fsd.getCacheCapacity();
+    long cacheUsed = fsd.getCacheUsed();
+    long current = 0;
+    assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
+    assertEquals("Unexpected amount of cache used", current, cacheUsed);
+
+    MetricsRecordBuilder dnMetrics;
+    long numCacheCommands = 0;
+    long numUncacheCommands = 0;
+
+    // Cache each block in succession, checking each time
+    for (int i=0; i<NUM_BLOCKS; i++) {
+      setHeartbeatResponse(cacheBlock(locs[i]));
+      current = verifyExpectedCacheUsage(current + blockSizes[i], i + 1);
+      dnMetrics = getMetrics(dn.getMetrics().name());
+      long cmds = MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+      assertTrue("Expected more cache requests from the NN ("
+          + cmds + " <= " + numCacheCommands + ")",
+           cmds > numCacheCommands);
+      numCacheCommands = cmds;
+    }
+
+    // Uncache each block in succession, again checking each time
+    for (int i=0; i<NUM_BLOCKS; i++) {
+      setHeartbeatResponse(uncacheBlock(locs[i]));
+      current = verifyExpectedCacheUsage(current - blockSizes[i],
+          NUM_BLOCKS - 1 - i);
+      dnMetrics = getMetrics(dn.getMetrics().name());
+      long cmds = MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
+      assertTrue("Expected more uncache requests from the NN",
+           cmds > numUncacheCommands);
+      numUncacheCommands = cmds;
+    }
+    LOG.info("finishing testCacheAndUncacheBlock");
+  }
+
+  @Test(timeout=600000)
+  public void testCacheAndUncacheBlockSimple() throws Exception {
+    testCacheAndUncacheBlock();
+  }
+
+  /**
+   * Run testCacheAndUncacheBlock with some failures injected into the mlock
+   * call.  This tests the ability of the NameNode to resend commands.
+   */
+  @Test(timeout=600000)
+  public void testCacheAndUncacheBlockWithRetries() throws Exception {
+    // We don't have to save the previous cacheManipulator
+    // because it will be reinstalled by the @After function.
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
+      private final Set<String> seenIdentifiers = new HashSet<String>();
+      
+      @Override
+      public void mlock(String identifier,
+          ByteBuffer mmap, long length) throws IOException {
+        if (seenIdentifiers.contains(identifier)) {
+          // mlock succeeds the second time.
+          LOG.info("mlocking " + identifier);
+          return;
+        }
+        seenIdentifiers.add(identifier);
+        throw new IOException("injecting IOException during mlock of " +
+            identifier);
+      }
+    });
+    testCacheAndUncacheBlock();
+  }
+
+  @Test(timeout=600000)
+  public void testFilesExceedMaxLockedMemory() throws Exception {
+    LOG.info("beginning testFilesExceedMaxLockedMemory");
+
+    // Create some test files that will exceed total cache capacity
+    final int numFiles = 5;
+    final long fileSize = CACHE_CAPACITY / (numFiles-1);
+
+    final Path[] testFiles = new Path[numFiles];
+    final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][];
+    final long[] fileSizes = new long[numFiles];
+    for (int i=0; i<numFiles; i++) {
+      testFiles[i] = new Path("/testFilesExceedMaxLockedMemory-" + i);
+      DFSTestUtil.createFile(fs, testFiles[i], fileSize, (short)1, 0xDFAl);
+      fileLocs[i] = (HdfsBlockLocation[])fs.getFileBlockLocations(
+          testFiles[i], 0, fileSize);
+      // Get the file size (sum of blocks)
+      long[] sizes = getBlockSizes(fileLocs[i]);
+      for (int j=0; j<sizes.length; j++) {
+        fileSizes[i] += sizes[j];
+      }
+    }
+
+    // Cache the first n-1 files
+    long total = 0;
+    verifyExpectedCacheUsage(0, 0);
+    for (int i=0; i<numFiles-1; i++) {
+      setHeartbeatResponse(cacheBlocks(fileLocs[i]));
+      total = verifyExpectedCacheUsage(
+          rounder.round(total + fileSizes[i]), 4 * (i + 1));
+    }
+
+    // nth file should hit a capacity exception
+    final LogVerificationAppender appender = new LogVerificationAppender();
+    final Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    setHeartbeatResponse(cacheBlocks(fileLocs[numFiles-1]));
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        int lines = appender.countLinesWithMessage(
+            "more bytes in the cache: " +
+            DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+        return lines > 0;
+      }
+    }, 500, 30000);
+    // Also check the metrics for the failure
+    assertTrue("Expected more than 0 failed cache attempts",
+        fsd.getNumBlocksFailedToCache() > 0);
+
+    // Uncache the n-1 files
+    for (int i=0; i<numFiles-1; i++) {
+      setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
+      total -= rounder.round(fileSizes[i]);
+      verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i));
+    }
+    LOG.info("finishing testFilesExceedMaxLockedMemory");
+  }
+
+  @Test(timeout=600000)
+  public void testUncachingBlocksBeforeCachingFinishes() throws Exception {
+    LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
+    final int NUM_BLOCKS = 5;
+
+    verifyExpectedCacheUsage(0, 0);
+
+    // Write a test file
+    final Path testFile = new Path("/testCacheBlock");
+    final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
+    DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 0xABBAl);
+
+    // Get the details of the written file
+    HdfsBlockLocation[] locs =
+        (HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0, testFileLen);
+    assertEquals("Unexpected number of blocks", NUM_BLOCKS, locs.length);
+    final long[] blockSizes = getBlockSizes(locs);
+
+    // Check initial state
+    final long cacheCapacity = fsd.getCacheCapacity();
+    long cacheUsed = fsd.getCacheUsed();
+    long current = 0;
+    assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
+    assertEquals("Unexpected amount of cache used", current, cacheUsed);
+
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
+      @Override
+      public void mlock(String identifier,
+          ByteBuffer mmap, long length) throws IOException {
+        LOG.info("An mlock operation is starting on " + identifier);
+        try {
+          Thread.sleep(3000);
+        } catch (InterruptedException e) {
+          Assert.fail();
+        }
+      }
+    });
+    // Starting caching each block in succession.  The usedBytes amount
+    // should increase, even though caching doesn't complete on any of them.
+    for (int i=0; i<NUM_BLOCKS; i++) {
+      setHeartbeatResponse(cacheBlock(locs[i]));
+      current = verifyExpectedCacheUsage(current + blockSizes[i], i + 1);
+    }
+    
+    setHeartbeatResponse(new DatanodeCommand[] {
+      getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
+    });
+
+    // wait until all caching jobs are finished cancelling.
+    current = verifyExpectedCacheUsage(0, 0);
+    LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
+  }
+
+  @Test(timeout=60000)
+  public void testUncacheUnknownBlock() throws Exception {
+    // Create a file
+    Path fileName = new Path("/testUncacheUnknownBlock");
+    int fileLen = 4096;
+    DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
+    HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
+        fileName, 0, fileLen);
+
+    // Try to uncache it without caching it first
+    setHeartbeatResponse(uncacheBlocks(locs));
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return fsd.getNumBlocksFailedToUncache() > 0;
+      }
+    }, 100, 10000);
+  }
+
+  @Test(timeout=60000)
+  public void testPageRounder() throws Exception {
+    // Write a small file
+    Path fileName = new Path("/testPageRounder");
+    final int smallBlocks = 512; // This should be smaller than the page size
+    assertTrue("Page size should be greater than smallBlocks!",
+        PAGE_SIZE > smallBlocks);
+    final int numBlocks = 5;
+    final int fileLen = smallBlocks * numBlocks;
+    FSDataOutputStream out =
+        fs.create(fileName, false, 4096, (short)1, smallBlocks);
+    out.write(new byte[fileLen]);
+    out.close();
+    HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
+        fileName, 0, fileLen);
+    // Cache the file and check the sizes match the page size
+    setHeartbeatResponse(cacheBlocks(locs));
+    verifyExpectedCacheUsage(PAGE_SIZE * numBlocks, numBlocks);
+    // Uncache and check that it decrements by the page size too
+    setHeartbeatResponse(uncacheBlocks(locs));
+    verifyExpectedCacheUsage(0, 0);
+  }
+
+  @Test(timeout=60000)
+  public void testUncacheQuiesces() throws Exception {
+    // Create a file
+    Path fileName = new Path("/testUncacheQuiesces");
+    int fileLen = 4096;
+    DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
+    // Cache it
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+        .setPool("pool").setPath(fileName).setReplication((short)3).build());
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksCached =
+            MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+        return blocksCached > 0;
+      }
+    }, 1000, 30000);
+    // Uncache it
+    dfs.removeCacheDirective(1);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksUncached =
+            MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
+        return blocksUncached > 0;
+      }
+    }, 1000, 30000);
+    // Make sure that no additional messages were sent
+    Thread.sleep(10000);
+    MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+    MetricsAsserts.assertCounter("BlocksCached", 1l, dnMetrics);
+    MetricsAsserts.assertCounter("BlocksUncached", 1l, dnMetrics);
+  }
+}

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java Wed Jan 22 21:43:00 2014
@@ -101,7 +101,7 @@ public class TestStorageReport {
     Mockito.verify(nnSpy).sendHeartbeat(
         any(DatanodeRegistration.class),
         captor.capture(),
-        anyInt(), anyInt(), anyInt());
+        anyLong(), anyLong(), anyInt(), anyInt(), anyInt());
 
     StorageReport[] reports = captor.getValue();
 



Mime
View raw message