Author: dhruba
Date: Fri Dec 10 22:29:38 2010
New Revision: 1044523
URL: http://svn.apache.org/viewvc?rev=1044523&view=rev
Log:
MAPREDUCE-1831. BlockPlacement policy for HDFS-RAID.
(Scott Chen via dhruba)
Added:
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1044523&r1=1044522&r2=1044523&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 10 22:29:38 2010
@@ -15,6 +15,9 @@ Trunk (unreleased changes)
MAPREDUCE-2215. A more elegant FileSystem#listCorruptFileBlocks API
(RAID changes) (Patrick Kling via hairong)
+ MAPREDUCE-1831. BlockPlacement policy for HDFS-RAID.
+ (Scott Chen via dhruba)
+
OPTIMIZATIONS
BUG FIXES
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java?rev=1044523&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
Fri Dec 10 22:29:38 2010
@@ -0,0 +1,627 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Comparator;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This BlockPlacementPolicy spreads out the group of blocks which used by RAID
+ * for recovering each other. This is important for the availability
+ * of the blocks. This class can be used by multiple threads. It has to be
+ * thread safe.
+ */
+public class BlockPlacementPolicyRaid extends BlockPlacementPolicy {
+ public static final Log LOG =
+ LogFactory.getLog(BlockPlacementPolicyRaid.class);
+ Configuration conf;
+ private int stripeLength;
+ private int xorParityLength;
+ private int rsParityLength;
+ private String xorPrefix = null;
+ private String rsPrefix = null;
+ private String raidTempPrefix = null;
+ private String raidrsTempPrefix = null;
+ private String raidHarTempPrefix = null;
+ private String raidrsHarTempPrefix = null;
+ private FSNamesystem namesystem = null;
+ private BlockPlacementPolicyDefault defaultPolicy;
+
+ CachedLocatedBlocks cachedLocatedBlocks;
+ CachedFullPathNames cachedFullPathNames;
+
+ /** {@inheritDoc} */
+ @Override
+ public void initialize(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ this.conf = conf;
+ this.stripeLength = RaidNode.getStripeLength(conf);
+ this.rsParityLength = RaidNode.rsParityLength(conf);
+ this.xorParityLength = 1;
+ try {
+ this.xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
+ this.rsPrefix = RaidNode.rsDestinationPath(conf).toUri().getPath();
+ } catch (IOException e) {
+ }
+ if (this.xorPrefix == null) {
+ this.xorPrefix = RaidNode.DEFAULT_RAID_LOCATION;
+ }
+ if (this.rsPrefix == null) {
+ this.rsPrefix = RaidNode.DEFAULT_RAIDRS_LOCATION;
+ }
+ // Throws ClassCastException if we cannot cast here.
+ this.namesystem = (FSNamesystem) stats;
+ this.cachedLocatedBlocks = new CachedLocatedBlocks(namesystem);
+ this.cachedFullPathNames = new CachedFullPathNames(namesystem);
+ this.raidTempPrefix = RaidNode.xorTempPrefix(conf);
+ this.raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
+ this.raidHarTempPrefix = RaidNode.xorHarTempPrefix(conf);
+ this.raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
+ defaultPolicy = new BlockPlacementPolicyDefault(conf, stats, clusterMap);
+ }
+
+ @Override
+ DatanodeDescriptor[] chooseTarget(String srcPath, int numOfReplicas,
+ DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes,
+ long blocksize) {
+ HashMap<Node, Node> excluded = new HashMap<Node, Node>();
+ return chooseTarget(srcPath, numOfReplicas, writer, chosenNodes,
+ excluded, blocksize);
+ }
+
+ @Override
+ DatanodeDescriptor[] chooseTarget(String srcPath, int numOfReplicas,
+ DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes,
+ HashMap<Node, Node> excludedNodes, long blocksize) {
+ try {
+ if (excludedNodes == null) {
+ excludedNodes = new HashMap<Node, Node>();
+ }
+ addExcludedNodes(srcPath, excludedNodes);
+ DatanodeDescriptor[] result =
+ defaultPolicy.chooseTarget(numOfReplicas, writer,
+ chosenNodes, excludedNodes, blocksize);
+ cachedLocatedBlocks.get(srcPath).
+ add(new LocatedBlock(new Block(), result));
+ return result;
+ } catch (Exception e) {
+ String trace = StringUtils.stringifyException(e);
+ System.out.println(trace);
+ FSNamesystem.LOG.debug(
+ "Error happend when choosing datanode to write.", e);
+ return defaultPolicy.chooseTarget(srcPath, numOfReplicas, writer,
+ chosenNodes, blocksize);
+ }
+ }
+
+ @Override
+ public int verifyBlockPlacement(String srcPath, LocatedBlock lBlk,
+ int minRacks) {
+ return defaultPolicy.verifyBlockPlacement(srcPath, lBlk, minRacks);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+ Block block, short replicationFactor,
+ Collection<DatanodeDescriptor> first,
+ Collection<DatanodeDescriptor> second) {
+
+ DatanodeDescriptor chosenNode = null;
+ try {
+ String path = cachedFullPathNames.get(inode);
+ List<LocatedBlock> companionBlocks = getCompanionBlocks(path, block);
+ if (companionBlocks == null || companionBlocks.size() == 0) {
+ // Use the default method if it is not a valid raided or parity file
+ return defaultPolicy.chooseReplicaToDelete(
+ inode, block, replicationFactor, first, second);
+ }
+ // Delete from the first collection first
+ // This ensures the number of unique rack of this block is not reduced
+ Collection<DatanodeDescriptor> all = new HashSet<DatanodeDescriptor>();
+ all.addAll(first);
+ all.addAll(second);
+ chosenNode = chooseReplicaToDelete(companionBlocks, all);
+ if (chosenNode != null) {
+ return chosenNode;
+ }
+ return defaultPolicy.chooseReplicaToDelete(
+ inode, block, replicationFactor, first, second);
+ } catch (Exception e) {
+ LOG.debug("Failed to choose the correct replica to delete", e);
+ return defaultPolicy.chooseReplicaToDelete(
+ inode, block, replicationFactor, first, second);
+ }
+ }
+
+ /**
+ * Obtain the excluded nodes for the current block that is being written
+ */
+ void addExcludedNodes(String file, HashMap<Node, Node> excluded)
+ throws IOException {
+ for (LocatedBlock b : getCompanionBlocks(file)) {
+ for (Node n : b.getLocations()) {
+ excluded.put(n, n);
+ }
+ }
+ }
+
+ private DatanodeDescriptor chooseReplicaToDelete(
+ Collection<LocatedBlock> companionBlocks,
+ Collection<DatanodeDescriptor> dataNodes) throws IOException {
+
+ if (dataNodes.isEmpty()) {
+ return null;
+ }
+ // Count the number of replicas on each node and rack
+ final Map<String, Integer> nodeCompanionBlockCount =
+ countCompanionBlocks(companionBlocks, false);
+ final Map<String, Integer> rackCompanionBlockCount =
+ countCompanionBlocks(companionBlocks, true);
+
+ NodeComparator comparator =
+ new NodeComparator(nodeCompanionBlockCount, rackCompanionBlockCount);
+ return Collections.max(dataNodes, comparator);
+ }
+
+ /**
+ * Count how many companion blocks are on each datanode or the each rack
+ * @param companionBlocks a collection of all the companion blocks
+ * @param doRackCount count the companion blocks on the racks of datanodes
+ * @param result the map from node name to the number of companion blocks
+ */
+ static Map<String, Integer> countCompanionBlocks(
+ Collection<LocatedBlock> companionBlocks, boolean doRackCount) {
+ Map<String, Integer> result = new HashMap<String, Integer>();
+ for (LocatedBlock block : companionBlocks) {
+ for (DatanodeInfo d : block.getLocations()) {
+ String name = doRackCount ? d.getParent().getName() : d.getName();
+ if (result.containsKey(name)) {
+ int count = result.get(name) + 1;
+ result.put(name, count);
+ } else {
+ result.put(name, 1);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Compares the datanodes based on the number of companion blocks on the same
+ * node and rack. If even, compare the remaining space on the datanodes.
+ */
+ class NodeComparator implements Comparator<DatanodeDescriptor> {
+ private Map<String, Integer> nodeBlockCount;
+ private Map<String, Integer> rackBlockCount;
+ private NodeComparator(Map<String, Integer> nodeBlockCount,
+ Map<String, Integer> rackBlockCount) {
+ this.nodeBlockCount = nodeBlockCount;
+ this.rackBlockCount = rackBlockCount;
+ }
+ @Override
+ public int compare(DatanodeDescriptor d1, DatanodeDescriptor d2) {
+ int res = compareBlockCount(d1, d2, nodeBlockCount);
+ if (res != 0) {
+ return res;
+ }
+ res = compareBlockCount(d1.getParent(), d2.getParent(), rackBlockCount);
+ if (res != 0) {
+ return res;
+ }
+ if (d1.getRemaining() > d2.getRemaining()) {
+ return -1;
+ }
+ if (d1.getRemaining() < d2.getRemaining()) {
+ return 1;
+ }
+ return 0;
+ }
+ private int compareBlockCount(Node node1, Node node2,
+ Map<String, Integer> blockCount) {
+ Integer count1 = blockCount.get(node1.getName());
+ Integer count2 = blockCount.get(node2.getName());
+ count1 = count1 == null ? 0 : count1;
+ count2 = count2 == null ? 0 : count2;
+ if (count1 > count2) {
+ return 1;
+ }
+ if (count1 < count2) {
+ return -1;
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * Obtain the companion blocks of the block that is currently being written.
+ * Companion blocks are defined as the blocks that can help recover each
+ * others by using raid decoder.
+ * @param path the path of the file contains the block
+ * @return the block locations of companion blocks
+ */
+ List<LocatedBlock> getCompanionBlocks(String path)
+ throws IOException {
+ // This will be the index of the block which is currently being written
+ int blockIndex = cachedLocatedBlocks.get(path).size();
+ return getCompanionBlocks(path, blockIndex);
+ }
+
+ /**
+ * Obtain the companion blocks of the give block
+ * Companion blocks are defined as the blocks that can help recover each
+ * others by using raid decoder.
+ * @param path the path of the file contains the block
+ * @param block the given block
+ * @return the block locations of companion blocks
+ */
+ List<LocatedBlock> getCompanionBlocks(String path, Block block)
+ throws IOException {
+ int blockIndex = getBlockIndex(path, block);
+ return getCompanionBlocks(path, blockIndex);
+ }
+
+ List<LocatedBlock> getCompanionBlocks(String path, int blockIndex)
+ throws IOException {
+ if (isXorHarTempParityFile(path)) {
+ // temp har xor parity file
+ return getCompanionBlocksForHarParityBlock(
+ path, xorParityLength, blockIndex);
+ }
+ if (isRsHarTempParityFile(path)) {
+ // temp har rs parity file
+ return getCompanionBlocksForHarParityBlock(
+ path, rsParityLength, blockIndex);
+ }
+ if (isXorTempParityFile(path)) {
+ // temp xor parity file
+ return getCompanionBlocksForParityBlock(
+ getSourceFile(path, raidTempPrefix), path,
+ xorParityLength, blockIndex);
+ }
+ if (isRsTempParityFile(path)) {
+ // temp rs parity file
+ return getCompanionBlocksForParityBlock(
+ getSourceFile(path, raidrsTempPrefix), path,
+ rsParityLength, blockIndex);
+ }
+ if (isXorParityFile(path)) {
+ // xor parity file
+ return getCompanionBlocksForParityBlock(getSourceFile(path, xorPrefix),
+ path, xorParityLength, blockIndex);
+ }
+ if (isRsParityFile(path)) {
+ // rs parity file
+ return getCompanionBlocksForParityBlock(getSourceFile(path, rsPrefix),
+ path, rsParityLength, blockIndex);
+ }
+ String parity = getParityFile(path);
+ if (parity == null) {
+ // corresponding parity file not found.
+ // return an empty list
+ return new ArrayList<LocatedBlock>();
+ }
+ if (isXorParityFile(parity)) {
+ // xor raided source file
+ return getCompanionBlocksForSourceBlock(
+ path, parity, xorParityLength, blockIndex);
+ }
+ if (isRsParityFile(parity)) {
+ // rs raided source file
+ return getCompanionBlocksForSourceBlock(
+ path, parity, rsParityLength, blockIndex);
+ }
+ // return an empty list
+ return new ArrayList<LocatedBlock>();
+ }
+
+ private List<LocatedBlock> getCompanionBlocksForHarParityBlock(
+ String parity, int parityLength, int blockIndex)
+ throws IOException {
+ // consider only parity file in this case because source file block
+ // location is not easy to obtain
+ List<LocatedBlock> parityBlocks = cachedLocatedBlocks.get(parity);
+ List<LocatedBlock> result = new ArrayList<LocatedBlock>();
+ synchronized (parityBlocks) {
+ int start = Math.max(0, blockIndex - parityLength + 1);
+ int end = Math.min(parityBlocks.size(), blockIndex + parityLength);
+ result = parityBlocks.subList(start, end);
+ }
+ return result;
+ }
+
+ private List<LocatedBlock> getCompanionBlocksForParityBlock(
+ String src, String parity, int parityLength, int blockIndex)
+ throws IOException {
+ List<LocatedBlock> result = new ArrayList<LocatedBlock>();
+ List<LocatedBlock> parityBlocks = cachedLocatedBlocks.get(parity);
+ int stripeIndex = blockIndex / parityLength;
+ synchronized (parityBlocks) {
+ int parityStart = stripeIndex * parityLength;
+ int parityEnd = Math.min(parityStart + parityLength,
+ parityBlocks.size());
+ // for parity, always consider the neighbor blocks as companion blocks
+ if (parityStart < parityBlocks.size()) {
+ result.addAll(parityBlocks.subList(parityStart, parityEnd));
+ }
+ }
+
+ if (src == null) {
+ return result;
+ }
+ List<LocatedBlock> sourceBlocks = cachedLocatedBlocks.get(src);
+ synchronized (sourceBlocks) {
+ int sourceStart = stripeIndex * stripeLength;
+ int sourceEnd = Math.min(sourceStart + stripeLength,
+ sourceBlocks.size());
+ if (sourceStart < sourceBlocks.size()) {
+ result.addAll(sourceBlocks.subList(sourceStart, sourceEnd));
+ }
+ }
+ return result;
+ }
+
+ private List<LocatedBlock> getCompanionBlocksForSourceBlock(
+ String src, String parity, int parityLength, int blockIndex)
+ throws IOException {
+ List<LocatedBlock> result = new ArrayList<LocatedBlock>();
+ List<LocatedBlock> sourceBlocks = cachedLocatedBlocks.get(src);
+ int stripeIndex = blockIndex / stripeLength;
+ synchronized (sourceBlocks) {
+ int sourceStart = stripeIndex * stripeLength;
+ int sourceEnd = Math.min(sourceStart + stripeLength,
+ sourceBlocks.size());
+ if (sourceStart < sourceBlocks.size()) {
+ result.addAll(sourceBlocks.subList(sourceStart, sourceEnd));
+ }
+ }
+ if (parity == null) {
+ return result;
+ }
+ List<LocatedBlock> parityBlocks = cachedLocatedBlocks.get(parity);
+ synchronized (parityBlocks) {
+ int parityStart = stripeIndex * parityLength;
+ int parityEnd = Math.min(parityStart + parityLength,
+ parityBlocks.size());
+ if (parityStart < parityBlocks.size()) {
+ result.addAll(parityBlocks.subList(parityStart, parityEnd));
+ }
+ }
+ return result;
+ }
+
+ private int getBlockIndex(String file, Block block) throws IOException {
+ List<LocatedBlock> blocks = cachedLocatedBlocks.get(file);
+ synchronized (blocks) {
+ for (int i = 0; i < blocks.size(); i++) {
+ if (blocks.get(i).getBlock().equals(block)) {
+ return i;
+ }
+ }
+ }
+ throw new IOException("Cannot locate " + block + " in file " + file);
+ }
+
+ /**
+ * Cache results for FSInodeInfo.getFullPathName()
+ */
+ static class CachedFullPathNames {
+ FSNamesystem namesystem;
+ CachedFullPathNames(FSNamesystem namesystem) {
+ this.namesystem = namesystem;
+ }
+ private Cache<INodeWithHashCode, String> cacheInternal =
+ new Cache<INodeWithHashCode, String>() {
+ @Override
+ public String getDirectly(INodeWithHashCode inode) throws IOException {
+ namesystem.readLock();
+ try {
+ return inode.getFullPathName();
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+ };
+
+ static private class INodeWithHashCode {
+ FSInodeInfo inode;
+ INodeWithHashCode(FSInodeInfo inode) {
+ this.inode = inode;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ return inode == obj;
+ }
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(inode);
+ }
+ String getFullPathName() {
+ return inode.getFullPathName();
+ }
+ }
+
+ public String get(FSInodeInfo inode) throws IOException {
+ return cacheInternal.get(new INodeWithHashCode(inode));
+ }
+ }
+
+ /**
+ * Cache results for FSNamesystem.getBlockLocations()
+ */
+ static class CachedLocatedBlocks extends Cache<String, List<LocatedBlock>>
{
+ FSNamesystem namesystem;
+ CachedLocatedBlocks(FSNamesystem namesystem) {
+ this.namesystem = namesystem;
+ }
+ @Override
+ public List<LocatedBlock> getDirectly(String file) throws IOException {
+ long len = namesystem.getFileInfo(file, true).getLen();
+ List<LocatedBlock> result = namesystem.getBlockLocations(
+ file, 0L, len, false, false).getLocatedBlocks();
+ if (result == null || result.isEmpty()) {
+ result = new ArrayList<LocatedBlock>();
+ }
+ return Collections.synchronizedList(result);
+ }
+ }
+
+ static abstract class Cache<K, V> {
+ private Map<K, ValueWithTime> cache;
+ private static final long CACHE_TIMEOUT = 300000L; // 5 minutes
+ // The timeout is long but the consequence of stale value is not serious
+ Cache() {
+ Map<K, ValueWithTime> map = new LinkedHashMap<K, ValueWithTime>() {
+ private static final long serialVersionUID = 1L;
+ final private int MAX_ENTRIES = 50000;
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<K, ValueWithTime> eldest) {
+ return size() > MAX_ENTRIES;
+ }
+ };
+ this.cache = Collections.synchronizedMap(map);
+ }
+
+ // Note that this method may hold FSNamesystem.readLock() and it may
+ // be called inside FSNamesystem.writeLock(). If we make this method
+ // synchronized, it will deadlock.
+ abstract protected V getDirectly(K key) throws IOException;
+
+ public V get(K key) throws IOException {
+ // The method is not synchronized so we may get some stale value here but
+ // it's OK.
+ ValueWithTime result = cache.get(key);
+ long now = System.currentTimeMillis();
+ if (result != null &&
+ now - result.cachedTime < CACHE_TIMEOUT) {
+ return result.value;
+ }
+ result = new ValueWithTime();
+ result.value = getDirectly(key);
+ result.cachedTime = now;
+ cache.put(key, result);
+ return result.value;
+ }
+ private class ValueWithTime {
+ V value = null;
+ long cachedTime = 0L;
+ }
+ }
+
+ /**
+ * Get path for the corresponding source file for a valid parity
+ * file. Returns null if it does not exists
+ * @param parity the toUri path of the parity file
+ * @return the toUri path of the source file
+ */
+ String getSourceFile(String parity, String prefix) throws IOException {
+ if (isHarFile(parity)) {
+ return null;
+ }
+ // remove the prefix
+ String src = parity.substring(prefix.length());
+ if (namesystem.dir.getFileInfo(src, true) == null) {
+ return null;
+ }
+ return src;
+ }
+
+ /**
+ * Get path for the corresponding parity file for a source file.
+ * Returns null if it does not exists
+ * @param src the toUri path of the source file
+ * @return the toUri path of the parity file
+ */
+ String getParityFile(String src) throws IOException {
+ String xorParity = getParityFile(xorPrefix, src);
+ if (xorParity != null) {
+ return xorParity;
+ }
+ String rsParity = getParityFile(rsPrefix, src);
+ if (rsParity != null) {
+ return rsParity;
+ }
+ return null;
+ }
+
+ /**
+ * Get path for the parity file. Returns null if it does not exists
+ * @param parityPrefix usuall "/raid/" or "/raidrs/"
+ * @return the toUri path of the parity file
+ */
+ private String getParityFile(String parityPrefix, String src)
+ throws IOException {
+ String parity = parityPrefix + src;
+ if (namesystem.dir.getFileInfo(parity, true) == null) {
+ return null;
+ }
+ return parity;
+ }
+
+ private boolean isHarFile(String path) {
+ return path.lastIndexOf(RaidNode.HAR_SUFFIX) != -1;
+ }
+
+ private boolean isXorHarTempParityFile(String path) {
+ return path.startsWith(raidHarTempPrefix + Path.SEPARATOR);
+ }
+
+ private boolean isRsHarTempParityFile(String path) {
+ return path.startsWith(raidrsHarTempPrefix + Path.SEPARATOR);
+ }
+
+ private boolean isXorTempParityFile(String path) {
+ return path.startsWith(raidTempPrefix + Path.SEPARATOR);
+ }
+
+ private boolean isRsTempParityFile(String path) {
+ return path.startsWith(raidrsTempPrefix + Path.SEPARATOR);
+ }
+
+ private boolean isXorParityFile(String path) {
+ return path.startsWith(xorPrefix + Path.SEPARATOR);
+ }
+
+ private boolean isRsParityFile(String path) {
+ return path.startsWith(rsPrefix + Path.SEPARATOR);
+ }
+
+
+}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1044523&r1=1044522&r2=1044523&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Fri Dec 10 22:29:38 2010
@@ -1361,6 +1361,13 @@ public abstract class RaidNode implement
/**
* Return the temp path for XOR parity files
*/
+ public static String xorTempPrefix(Configuration conf) {
+ return conf.get(RAID_TMP_LOCATION_KEY, DEFAULT_RAID_TMP_LOCATION);
+ }
+
+ /**
+ * Return the temp path for XOR parity files
+ */
public static String xorHarTempPrefix(Configuration conf) {
return conf.get(RAID_HAR_TMP_LOCATION_KEY, DEFAULT_RAID_HAR_TMP_LOCATION);
}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java?rev=1044523&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
Fri Dec 10 22:29:38 2010
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.CachedFullPathNames;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.CachedLocatedBlocks;
+import org.apache.hadoop.raid.RaidNode;
+import org.junit.Test;
+
+public class TestBlockPlacementPolicyRaid {
+ private Configuration conf = null;
+ private MiniDFSCluster cluster = null;
+ private FSNamesystem namesystem = null;
+ private BlockPlacementPolicyRaid policy = null;
+ private FileSystem fs = null;
+ String[] rack1 = {"/rack1"};
+ String[] rack2 = {"/rack2"};
+ String[] host1 = {"host1.rack1.com"};
+ String[] host2 = {"host2.rack2.com"};
+ String xorPrefix = null;
+ String raidTempPrefix = null;
+ String raidrsTempPrefix = null;
+ String raidrsHarTempPrefix = null;
+
+ final static Log LOG =
+ LogFactory.getLog(TestBlockPlacementPolicyRaid.class);
+
+ protected void setupCluster() throws IOException {
+ conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
+ conf.set("dfs.replication.pending.timeout.sec", "2");
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L);
+ conf.set("dfs.block.replicator.classname",
+ "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid");
+ conf.set(RaidNode.STRIPE_LENGTH_KEY, "2");
+ conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3");
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
+ // start the cluster with one datanode first
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).
+ format(true).racks(rack1).hosts(host1).build();
+ cluster.waitActive();
+ namesystem = cluster.getNameNode().getNamesystem();
+ Assert.assertTrue("BlockPlacementPolicy type is not correct.",
+ namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid);
+ policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator;
+ fs = cluster.getFileSystem();
+ xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
+ raidTempPrefix = RaidNode.xorTempPrefix(conf);
+ raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
+ raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
+ }
+
+ /**
+ * Test that the parity files will be placed at the good locations when we
+ * create them.
+ */
+ @Test
+ public void testChooseTargetForRaidFile() throws IOException {
+ setupCluster();
+ try {
+ String src = "/dir/file";
+ String parity = raidrsTempPrefix + src;
+ DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L);
+ DFSTestUtil.waitReplication(fs, new Path(src), (short)1);
+ refreshPolicy();
+ setBlockPlacementPolicy(namesystem, policy);
+ // start 3 more datanodes
+ String[] racks = {"/rack2", "/rack2", "/rack2",
+ "/rack2", "/rack2", "/rack2"};
+ String[] hosts =
+ {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
+ "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
+ cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
+ int numBlocks = 6;
+ DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
+ FileStatus srcStat = fs.getFileStatus(new Path(src));
+ BlockLocation[] srcLoc =
+ fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
+ FileStatus parityStat = fs.getFileStatus(new Path(parity));
+ BlockLocation[] parityLoc =
+ fs.getFileBlockLocations(parityStat, 0, parityStat.getLen());
+ int stripeLen = RaidNode.getStripeLength(conf);
+ for (int i = 0; i < numBlocks / stripeLen; i++) {
+ Set<String> locations = new HashSet<String>();
+ for (int j = 0; j < srcLoc.length; j++) {
+ String [] names = srcLoc[j].getNames();
+ for (int k = 0; k < names.length; k++) {
+ LOG.info("Source block location: " + names[k]);
+ locations.add(names[k]);
+ }
+ }
+ for (int j = 0 ; j < stripeLen; j++) {
+ String[] names = parityLoc[j + i * stripeLen].getNames();
+ for (int k = 0; k < names.length; k++) {
+ LOG.info("Parity block location: " + names[k]);
+ Assert.assertTrue(locations.add(names[k]));
+ }
+ }
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test that the har parity files will be placed at the good locations when we
+ * create them.
+ */
+ @Test
+ public void testChooseTargetForHarRaidFile() throws IOException {
+ setupCluster();
+ try {
+ String[] racks = {"/rack2", "/rack2", "/rack2",
+ "/rack2", "/rack2", "/rack2"};
+ String[] hosts =
+ {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
+ "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
+ cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
+ String harParity = raidrsHarTempPrefix + "/dir/file";
+ int numBlocks = 11;
+ DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L);
+ DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1);
+ FileStatus stat = fs.getFileStatus(new Path(harParity));
+ BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen());
+ int rsParityLength = RaidNode.rsParityLength(conf);
+ for (int i = 0; i < numBlocks - rsParityLength; i++) {
+ Set<String> locations = new HashSet<String>();
+ for (int j = 0; j < rsParityLength; j++) {
+ for (int k = 0; k < loc[i + j].getNames().length; k++) {
+ // verify that every adjacent 4 blocks are on differnt nodes
+ String name = loc[i + j].getNames()[k];
+ LOG.info("Har Raid block location: " + name);
+ Assert.assertTrue(locations.add(name));
+ }
+ }
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test BlockPlacementPolicyRaid.CachedLocatedBlocks
+ * Verify that the results obtained from cache is the same as
+ * the results obtained directly
+ */
+ @Test
+ public void testCachedBlocks() throws IOException {
+ setupCluster();
+ try {
+ String file1 = "/dir/file1";
+ String file2 = "/dir/file2";
+ DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+ DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+ // test blocks cache
+ CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+ try {
+ Thread.sleep(1200L);
+ } catch (InterruptedException e) {
+ }
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test BlockPlacementPolicyRaid.CachedFullPathNames
+ * Verify that the results obtained from cache is the same as
+ * the results obtained directly
+ */
+ @Test
+ public void testCachedPathNames() throws IOException {
+ setupCluster();
+ try {
+ String file1 = "/dir/file1";
+ String file2 = "/dir/file2";
+ DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+ DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+ // test full path cache
+ CachedFullPathNames cachedFullPathNames =
+ new CachedFullPathNames(namesystem);
+ FSInodeInfo inode1 = null;
+ FSInodeInfo inode2 = null;
+ namesystem.dir.readLock();
+ try {
+ inode1 = namesystem.dir.rootDir.getNode(file1, true);
+ inode2 = namesystem.dir.rootDir.getNode(file2, true);
+ } finally {
+ namesystem.dir.readUnlock();
+ }
+ verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+ verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+ verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+ verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+ try {
+ Thread.sleep(1200L);
+ } catch (InterruptedException e) {
+ }
+ verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+ verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+ /**
+ * Test the result of getCompanionBlocks() on the unraided files
+ */
+ @Test
+ public void testGetCompanionBLocks() throws IOException {
+ setupCluster();
+ try {
+ String file1 = "/dir/file1";
+ String file2 = "/raid/dir/file2";
+ String file3 = "/raidrs/dir/file3";
+ // Set the policy to default policy to place the block in the default way
+ setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
+ conf, namesystem, namesystem.clusterMap));
+ DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+ DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+ DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L);
+ Collection<LocatedBlock> companionBlocks;
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock());
+ Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock());
+ Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock());
+ Assert.assertEquals(1, companionBlocks.size());
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock());
+ Assert.assertEquals(1, companionBlocks.size());
+
+ int rsParityLength = RaidNode.rsParityLength(conf);
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock());
+ Assert.assertEquals(rsParityLength, companionBlocks.size());
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock());
+ Assert.assertEquals(rsParityLength, companionBlocks.size());
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock());
+ Assert.assertEquals(2, companionBlocks.size());
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ static void setBlockPlacementPolicy(
+ FSNamesystem namesystem, BlockPlacementPolicy policy) {
+ namesystem.writeLock();
+ try {
+ namesystem.blockManager.replicator = policy;
+ } finally {
+ namesystem.writeUnlock();
+ }
+ }
+
+ /**
+ * Test BlockPlacementPolicyRaid actually deletes the correct replica.
+ * Start 2 datanodes and create 1 source file and its parity file.
+ * 1) Start host1, create the parity file with replication 1
+ * 2) Start host2, create the source file with replication 2
+ * 3) Set repliation of source file to 1
+ * Verify that the policy should delete the block with more companion blocks.
+ */
+ @Test
+ public void testDeleteReplica() throws IOException {
+ setupCluster();
+ try {
+ // Set the policy to default policy to place the block in the default way
+ setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
+ conf, namesystem, namesystem.clusterMap));
+ DatanodeDescriptor datanode1 =
+ namesystem.datanodeMap.values().iterator().next();
+ String source = "/dir/file";
+ String parity = xorPrefix + source;
+
+ final Path parityPath = new Path(parity);
+ DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L);
+ DFSTestUtil.waitReplication(fs, parityPath, (short)1);
+
+ // start one more datanode
+ cluster.startDataNodes(conf, 1, true, null, rack2, host2, null);
+ DatanodeDescriptor datanode2 = null;
+ for (DatanodeDescriptor d : namesystem.datanodeMap.values()) {
+ if (!d.getName().equals(datanode1.getName())) {
+ datanode2 = d;
+ }
+ }
+ Assert.assertTrue(datanode2 != null);
+ cluster.waitActive();
+ final Path sourcePath = new Path(source);
+ DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L);
+ DFSTestUtil.waitReplication(fs, sourcePath, (short)2);
+
+ refreshPolicy();
+ Assert.assertEquals(parity,
+ policy.getParityFile(source));
+ Assert.assertEquals(source,
+ policy.getSourceFile(parity, xorPrefix));
+
+ List<LocatedBlock> sourceBlocks = getBlocks(namesystem, source);
+ List<LocatedBlock> parityBlocks = getBlocks(namesystem, parity);
+ Assert.assertEquals(5, sourceBlocks.size());
+ Assert.assertEquals(3, parityBlocks.size());
+
+ // verify the result of getCompanionBlocks()
+ Collection<LocatedBlock> companionBlocks;
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(0).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{0, 1}, new int[]{0});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(1).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{0, 1}, new int[]{0});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(2).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{2, 3}, new int[]{1});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(3).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{2, 3}, new int[]{1});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(4).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{4}, new int[]{2});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, parityBlocks.get(0).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{0, 1}, new int[]{0});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, parityBlocks.get(1).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{2, 3}, new int[]{1});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, parityBlocks.get(2).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{4}, new int[]{2});
+
+ // Set the policy back to raid policy. We have to create a new object
+ // here to clear the block location cache
+ refreshPolicy();
+ setBlockPlacementPolicy(namesystem, policy);
+ // verify policy deletes the correct blocks. companion blocks should be
+ // evenly distributed.
+ fs.setReplication(sourcePath, (short)1);
+ DFSTestUtil.waitReplication(fs, sourcePath, (short)1);
+ Map<String, Integer> counters = new HashMap<String, Integer>();
+ refreshPolicy();
+ for (int i = 0; i < parityBlocks.size(); i++) {
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, parityBlocks.get(i).getBlock());
+
+ counters = BlockPlacementPolicyRaid.countCompanionBlocks(
+ companionBlocks, false);
+ Assert.assertTrue(counters.get(datanode1.getName()) >= 1 &&
+ counters.get(datanode1.getName()) <= 2);
+ Assert.assertTrue(counters.get(datanode1.getName()) +
+ counters.get(datanode2.getName()) ==
+ companionBlocks.size());
+
+ counters = BlockPlacementPolicyRaid.countCompanionBlocks(
+ companionBlocks, true);
+ Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 &&
+ counters.get(datanode1.getParent().getName()) <= 2);
+ Assert.assertTrue(counters.get(datanode1.getParent().getName()) +
+ counters.get(datanode2.getParent().getName()) ==
+ companionBlocks.size());
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ // create a new BlockPlacementPolicyRaid to clear the cache
+ private void refreshPolicy() {
+ policy = new BlockPlacementPolicyRaid();
+ policy.initialize(conf, namesystem, namesystem.clusterMap);
+ }
+
+ private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
+ List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
+ int[] sourceBlockIndexes, int[] parityBlockIndexes) {
+ Set<Block> blockSet = new HashSet<Block>();
+ for (LocatedBlock b : companionBlocks) {
+ blockSet.add(b.getBlock());
+ }
+ Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length,
+ blockSet.size());
+ for (int index : sourceBlockIndexes) {
+ Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock()));
+ }
+ for (int index : parityBlockIndexes) {
+ Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock()));
+ }
+ }
+
+ private void verifyCachedFullPathNameResult(
+ CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
+ throws IOException {
+ String res1 = inode.getFullPathName();
+ String res2 = cachedFullPathNames.get(inode);
+ LOG.info("Actual path name: " + res1);
+ LOG.info("Cached path name: " + res2);
+ Assert.assertEquals(cachedFullPathNames.get(inode),
+ inode.getFullPathName());
+ }
+
+ private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
+ FSNamesystem namesystem, String file) throws IOException{
+ long len = namesystem.getFileInfo(file, true).getLen();
+ List<LocatedBlock> res1 = namesystem.getBlockLocations(
+ file, 0L, len, false, false).getLocatedBlocks();
+ List<LocatedBlock> res2 = cachedBlocks.get(file);
+ for (int i = 0; i < res1.size(); i++) {
+ LOG.info("Actual block: " + res1.get(i).getBlock());
+ LOG.info("Cached block: " + res2.get(i).getBlock());
+ Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock());
+ }
+ }
+
+ private Collection<LocatedBlock> getCompanionBlocks(
+ FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
+ Block block) throws IOException {
+ INodeFile inode = namesystem.blockManager.blocksMap.getINode(block);
+ return policy.getCompanionBlocks(inode.getFullPathName(), block);
+ }
+
+ private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file)
+ throws IOException {
+ long len = namesystem.getFileInfo(file, true).getLen();
+ return namesystem.getBlockLocations(
+ file, 0, len, false, false).getLocatedBlocks();
+ }
+}
|