hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r1044523 - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/ src/contrib/raid/src/test...
Date Fri, 10 Dec 2010 22:29:45 GMT
Author: dhruba
Date: Fri Dec 10 22:29:38 2010
New Revision: 1044523

URL: http://svn.apache.org/viewvc?rev=1044523&view=rev
Log:
MAPREDUCE-1831. BlockPlacement policy for HDFS-RAID.
(Scott Chen via dhruba)


Added:
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1044523&r1=1044522&r2=1044523&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 10 22:29:38 2010
@@ -15,6 +15,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2215. A more elegant FileSystem#listCorruptFileBlocks API
     (RAID changes) (Patrick Kling via hairong)
 
+    MAPREDUCE-1831. BlockPlacement policy for HDFS-RAID.
+    (Scott Chen via dhruba)
+
   OPTIMIZATIONS
 
   BUG FIXES

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java?rev=1044523&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
Fri Dec 10 22:29:38 2010
@@ -0,0 +1,627 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Comparator;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This BlockPlacementPolicy spreads out the group of blocks which used by RAID
+ * for recovering each other. This is important for the availability
+ * of the blocks. This class can be used by multiple threads. It has to be
+ * thread safe.
+ */
+public class BlockPlacementPolicyRaid extends BlockPlacementPolicy {
+  public static final Log LOG =
+    LogFactory.getLog(BlockPlacementPolicyRaid.class);
+  Configuration conf;
+  private int stripeLength;
+  private int xorParityLength;
+  private int rsParityLength;
+  private String xorPrefix = null;
+  private String rsPrefix = null;
+  private String raidTempPrefix = null;
+  private String raidrsTempPrefix = null;
+  private String raidHarTempPrefix = null;
+  private String raidrsHarTempPrefix = null;
+  private FSNamesystem namesystem = null;
+  private BlockPlacementPolicyDefault defaultPolicy;
+
+  CachedLocatedBlocks cachedLocatedBlocks;
+  CachedFullPathNames cachedFullPathNames;
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize(Configuration conf,  FSClusterStats stats,
+                         NetworkTopology clusterMap) {
+    this.conf = conf;
+    this.stripeLength = RaidNode.getStripeLength(conf);
+    this.rsParityLength = RaidNode.rsParityLength(conf);
+    this.xorParityLength = 1;
+    try {
+      this.xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
+      this.rsPrefix = RaidNode.rsDestinationPath(conf).toUri().getPath();
+    } catch (IOException e) {
+    }
+    if (this.xorPrefix == null) {
+      this.xorPrefix = RaidNode.DEFAULT_RAID_LOCATION;
+    }
+    if (this.rsPrefix == null) {
+      this.rsPrefix = RaidNode.DEFAULT_RAIDRS_LOCATION;
+    }
+    // Throws ClassCastException if we cannot cast here.
+    this.namesystem = (FSNamesystem) stats;
+    this.cachedLocatedBlocks = new CachedLocatedBlocks(namesystem);
+    this.cachedFullPathNames = new CachedFullPathNames(namesystem);
+    this.raidTempPrefix = RaidNode.xorTempPrefix(conf);
+    this.raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
+    this.raidHarTempPrefix = RaidNode.xorHarTempPrefix(conf);
+    this.raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
+    defaultPolicy = new BlockPlacementPolicyDefault(conf, stats, clusterMap);
+  }
+
+  @Override
+  DatanodeDescriptor[] chooseTarget(String srcPath, int numOfReplicas,
+      DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes,
+      long blocksize) {
+    HashMap<Node, Node> excluded = new HashMap<Node, Node>();
+    return chooseTarget(srcPath, numOfReplicas, writer, chosenNodes,
+        excluded, blocksize);
+  }
+
+  @Override
+  DatanodeDescriptor[] chooseTarget(String srcPath, int numOfReplicas,
+      DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes,
+      HashMap<Node, Node> excludedNodes, long blocksize) {
+    try {
+      if (excludedNodes == null) {
+        excludedNodes = new HashMap<Node, Node>();
+      }
+      addExcludedNodes(srcPath, excludedNodes);
+      DatanodeDescriptor[] result =
+        defaultPolicy.chooseTarget(numOfReplicas, writer,
+          chosenNodes, excludedNodes, blocksize);
+      cachedLocatedBlocks.get(srcPath).
+          add(new LocatedBlock(new Block(), result));
+      return result;
+    } catch (Exception e) {
+      String trace = StringUtils.stringifyException(e);
+      System.out.println(trace);
+      FSNamesystem.LOG.debug(
+        "Error happend when choosing datanode to write.", e);
+      return defaultPolicy.chooseTarget(srcPath, numOfReplicas, writer,
+                                chosenNodes, blocksize);
+    }
+  }
+
+  @Override
+  public int verifyBlockPlacement(String srcPath, LocatedBlock lBlk,
+      int minRacks) {
+    return defaultPolicy.verifyBlockPlacement(srcPath, lBlk, minRacks);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+      Block block, short replicationFactor,
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
+
+    DatanodeDescriptor chosenNode = null;
+    try {
+      String path = cachedFullPathNames.get(inode);
+      List<LocatedBlock> companionBlocks = getCompanionBlocks(path, block);
+      if (companionBlocks == null || companionBlocks.size() == 0) {
+        // Use the default method if it is not a valid raided or parity file
+        return defaultPolicy.chooseReplicaToDelete(
+            inode, block, replicationFactor, first, second);
+      }
+      // Delete from the first collection first
+      // This ensures the number of unique rack of this block is not reduced
+      Collection<DatanodeDescriptor> all = new HashSet<DatanodeDescriptor>();
+      all.addAll(first);
+      all.addAll(second);
+      chosenNode = chooseReplicaToDelete(companionBlocks, all);
+      if (chosenNode != null) {
+        return chosenNode;
+      }
+      return defaultPolicy.chooseReplicaToDelete(
+          inode, block, replicationFactor, first, second);
+    } catch (Exception e) {
+      LOG.debug("Failed to choose the correct replica to delete", e);
+      return defaultPolicy.chooseReplicaToDelete(
+          inode, block, replicationFactor, first, second);
+    }
+  }
+
+  /**
+   * Obtain the excluded nodes for the current block that is being written
+   */
+  void addExcludedNodes(String file, HashMap<Node, Node> excluded)
+      throws IOException {
+    for (LocatedBlock b : getCompanionBlocks(file)) {
+      for (Node n : b.getLocations()) {
+        excluded.put(n, n);
+      }
+    }
+  }
+
+  private DatanodeDescriptor chooseReplicaToDelete(
+      Collection<LocatedBlock> companionBlocks,
+      Collection<DatanodeDescriptor> dataNodes) throws IOException {
+
+    if (dataNodes.isEmpty()) {
+      return null;
+    }
+    // Count the number of replicas on each node and rack
+    final Map<String, Integer> nodeCompanionBlockCount =
+      countCompanionBlocks(companionBlocks, false);
+    final Map<String, Integer> rackCompanionBlockCount =
+      countCompanionBlocks(companionBlocks, true);
+
+    NodeComparator comparator =
+      new NodeComparator(nodeCompanionBlockCount, rackCompanionBlockCount);
+    return Collections.max(dataNodes, comparator);
+  }
+
+  /**
+   * Count how many companion blocks are on each datanode or the each rack
+   * @param companionBlocks a collection of all the companion blocks
+   * @param doRackCount count the companion blocks on the racks of datanodes
+   * @param result the map from node name to the number of companion blocks
+   */
+  static Map<String, Integer> countCompanionBlocks(
+      Collection<LocatedBlock> companionBlocks, boolean doRackCount) {
+    Map<String, Integer> result = new HashMap<String, Integer>();
+    for (LocatedBlock block : companionBlocks) {
+      for (DatanodeInfo d : block.getLocations()) {
+        String name = doRackCount ? d.getParent().getName() : d.getName();
+        if (result.containsKey(name)) {
+          int count = result.get(name) + 1;
+          result.put(name, count);
+        } else {
+          result.put(name, 1);
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Compares the datanodes based on the number of companion blocks on the same
+   * node and rack. If even, compare the remaining space on the datanodes.
+   */
+  class NodeComparator implements Comparator<DatanodeDescriptor> {
+    private Map<String, Integer> nodeBlockCount;
+    private Map<String, Integer> rackBlockCount;
+    private NodeComparator(Map<String, Integer> nodeBlockCount,
+                           Map<String, Integer> rackBlockCount) {
+      this.nodeBlockCount = nodeBlockCount;
+      this.rackBlockCount = rackBlockCount;
+    }
+    @Override
+    public int compare(DatanodeDescriptor d1, DatanodeDescriptor d2) {
+      int res = compareBlockCount(d1, d2, nodeBlockCount);
+      if (res != 0) {
+        return res;
+      }
+      res = compareBlockCount(d1.getParent(), d2.getParent(), rackBlockCount);
+      if (res != 0) {
+        return res;
+      }
+      if (d1.getRemaining() > d2.getRemaining()) {
+        return -1;
+      }
+      if (d1.getRemaining() < d2.getRemaining()) {
+        return 1;
+      }
+      return 0;
+    }
+    private int compareBlockCount(Node node1, Node node2,
+                                  Map<String, Integer> blockCount) {
+      Integer count1 = blockCount.get(node1.getName());
+      Integer count2 = blockCount.get(node2.getName());
+      count1 = count1 == null ? 0 : count1;
+      count2 = count2 == null ? 0 : count2;
+      if (count1 > count2) {
+        return 1;
+      }
+      if (count1 < count2) {
+        return -1;
+      }
+      return 0;
+    }
+  }
+
+  /**
+   * Obtain the companion blocks of the block that is currently being written.
+   * Companion blocks are defined as the blocks that can help recover each
+   * others by using raid decoder.
+   * @param path the path of the file contains the block
+   * @return the block locations of companion blocks
+   */
+  List<LocatedBlock> getCompanionBlocks(String path)
+      throws IOException {
+    // This will be the index of the block which is currently being written
+    int blockIndex = cachedLocatedBlocks.get(path).size();
+    return getCompanionBlocks(path, blockIndex);
+  }
+
+  /**
+   * Obtain the companion blocks of the give block
+   * Companion blocks are defined as the blocks that can help recover each
+   * others by using raid decoder.
+   * @param path the path of the file contains the block
+   * @param block the given block
+   * @return the block locations of companion blocks
+   */
+  List<LocatedBlock> getCompanionBlocks(String path, Block block)
+      throws IOException {
+    int blockIndex = getBlockIndex(path, block);
+    return getCompanionBlocks(path, blockIndex);
+  }
+
+  List<LocatedBlock> getCompanionBlocks(String path, int blockIndex)
+      throws IOException {
+    if (isXorHarTempParityFile(path)) {
+      // temp har xor parity file
+      return getCompanionBlocksForHarParityBlock(
+          path, xorParityLength, blockIndex);
+    }
+    if (isRsHarTempParityFile(path)) {
+      // temp har rs parity file
+      return getCompanionBlocksForHarParityBlock(
+          path, rsParityLength, blockIndex);
+    }
+    if (isXorTempParityFile(path)) {
+      // temp xor parity file
+      return getCompanionBlocksForParityBlock(
+          getSourceFile(path, raidTempPrefix), path,
+          xorParityLength, blockIndex);
+    }
+    if (isRsTempParityFile(path)) {
+      // temp rs parity file
+      return getCompanionBlocksForParityBlock(
+          getSourceFile(path, raidrsTempPrefix), path,
+          rsParityLength, blockIndex);
+    }
+    if (isXorParityFile(path)) {
+      // xor parity file
+      return getCompanionBlocksForParityBlock(getSourceFile(path, xorPrefix),
+          path, xorParityLength, blockIndex);
+    }
+    if (isRsParityFile(path)) {
+      // rs parity file
+      return getCompanionBlocksForParityBlock(getSourceFile(path, rsPrefix),
+          path, rsParityLength, blockIndex);
+    }
+    String parity = getParityFile(path);
+    if (parity == null) {
+      // corresponding parity file not found.
+      // return an empty list
+      return new ArrayList<LocatedBlock>();
+    }
+    if (isXorParityFile(parity)) {
+      // xor raided source file
+      return getCompanionBlocksForSourceBlock(
+          path, parity, xorParityLength, blockIndex);
+    }
+    if (isRsParityFile(parity)) {
+      // rs raided source file
+      return getCompanionBlocksForSourceBlock(
+          path, parity, rsParityLength, blockIndex);
+    }
+    // return an empty list
+    return new ArrayList<LocatedBlock>();
+  }
+
+  private List<LocatedBlock> getCompanionBlocksForHarParityBlock(
+      String parity, int parityLength, int blockIndex)
+      throws IOException {
+    // consider only parity file in this case because source file block
+    // location is not easy to obtain
+    List<LocatedBlock> parityBlocks = cachedLocatedBlocks.get(parity);
+    List<LocatedBlock> result = new ArrayList<LocatedBlock>();
+    synchronized (parityBlocks) {
+      int start = Math.max(0, blockIndex - parityLength + 1);
+      int end = Math.min(parityBlocks.size(), blockIndex + parityLength);
+      result = parityBlocks.subList(start, end);
+    }
+    return result;
+  }
+
+  private List<LocatedBlock> getCompanionBlocksForParityBlock(
+      String src, String parity, int parityLength, int blockIndex)
+      throws IOException {
+    List<LocatedBlock> result = new ArrayList<LocatedBlock>();
+    List<LocatedBlock> parityBlocks = cachedLocatedBlocks.get(parity);
+    int stripeIndex = blockIndex / parityLength;
+    synchronized (parityBlocks) {
+      int parityStart = stripeIndex * parityLength;
+      int parityEnd = Math.min(parityStart + parityLength,
+                               parityBlocks.size());
+      // for parity, always consider the neighbor blocks as companion blocks
+      if (parityStart < parityBlocks.size()) {
+        result.addAll(parityBlocks.subList(parityStart, parityEnd));
+      }
+    }
+
+    if (src == null) {
+      return result;
+    }
+    List<LocatedBlock> sourceBlocks = cachedLocatedBlocks.get(src);
+    synchronized (sourceBlocks) {
+      int sourceStart = stripeIndex * stripeLength;
+      int sourceEnd = Math.min(sourceStart + stripeLength,
+                               sourceBlocks.size());
+      if (sourceStart < sourceBlocks.size()) {
+        result.addAll(sourceBlocks.subList(sourceStart, sourceEnd));
+      }
+    }
+    return result;
+  }
+
+  private List<LocatedBlock> getCompanionBlocksForSourceBlock(
+      String src, String parity, int parityLength, int blockIndex)
+      throws IOException {
+    List<LocatedBlock> result = new ArrayList<LocatedBlock>();
+    List<LocatedBlock> sourceBlocks = cachedLocatedBlocks.get(src);
+    int stripeIndex = blockIndex / stripeLength;
+    synchronized (sourceBlocks) {
+      int sourceStart = stripeIndex * stripeLength;
+      int sourceEnd = Math.min(sourceStart + stripeLength,
+                               sourceBlocks.size());
+      if (sourceStart < sourceBlocks.size()) {
+        result.addAll(sourceBlocks.subList(sourceStart, sourceEnd));
+      }
+    }
+    if (parity == null) {
+      return result;
+    }
+    List<LocatedBlock> parityBlocks = cachedLocatedBlocks.get(parity);
+    synchronized (parityBlocks) {
+      int parityStart = stripeIndex * parityLength;
+      int parityEnd = Math.min(parityStart + parityLength,
+                               parityBlocks.size());
+      if (parityStart < parityBlocks.size()) {
+        result.addAll(parityBlocks.subList(parityStart, parityEnd));
+      }
+    }
+    return result;
+  }
+
+  private int getBlockIndex(String file, Block block) throws IOException {
+    List<LocatedBlock> blocks = cachedLocatedBlocks.get(file);
+    synchronized (blocks) {
+      for (int i = 0; i < blocks.size(); i++) {
+        if (blocks.get(i).getBlock().equals(block)) {
+          return i;
+        }
+      }
+    }
+    throw new IOException("Cannot locate " + block + " in file " + file);
+  }
+
+  /**
+   * Cache results for FSInodeInfo.getFullPathName()
+   */
+  static class CachedFullPathNames {
+    FSNamesystem namesystem;
+    CachedFullPathNames(FSNamesystem namesystem) {
+      this.namesystem = namesystem;
+    }
+    private Cache<INodeWithHashCode, String> cacheInternal =
+      new Cache<INodeWithHashCode, String>() {
+        @Override
+        public String getDirectly(INodeWithHashCode inode) throws IOException {
+          namesystem.readLock();
+          try {
+            return inode.getFullPathName();
+          } finally {
+            namesystem.readUnlock();
+          }
+        }
+      };
+
+    static private class INodeWithHashCode {
+      FSInodeInfo inode;
+      INodeWithHashCode(FSInodeInfo inode) {
+        this.inode = inode;
+      }
+      @Override
+      public boolean equals(Object obj) {
+        return inode == obj;
+      }
+      @Override
+      public int hashCode() {
+        return System.identityHashCode(inode);
+      }
+      String getFullPathName() {
+        return inode.getFullPathName();
+      }
+    }
+
+    public String get(FSInodeInfo inode) throws IOException {
+      return cacheInternal.get(new INodeWithHashCode(inode));
+    }
+  }
+
+  /**
+   * Cache results for FSNamesystem.getBlockLocations()
+   */
+  static class CachedLocatedBlocks extends Cache<String, List<LocatedBlock>>
{
+    FSNamesystem namesystem;
+    CachedLocatedBlocks(FSNamesystem namesystem) {
+      this.namesystem = namesystem;
+    }
+    @Override
+    public List<LocatedBlock> getDirectly(String file) throws IOException {
+      long len = namesystem.getFileInfo(file, true).getLen();
+      List<LocatedBlock> result = namesystem.getBlockLocations(
+          file, 0L, len, false, false).getLocatedBlocks();
+      if (result == null || result.isEmpty()) {
+        result = new ArrayList<LocatedBlock>();
+      }
+      return Collections.synchronizedList(result);
+    }
+  }
+
+  static abstract class Cache<K, V> {
+    private Map<K, ValueWithTime> cache;
+    private static final long CACHE_TIMEOUT = 300000L; // 5 minutes
+    // The timeout is long but the consequence of stale value is not serious
+    Cache() {
+      Map<K, ValueWithTime> map = new LinkedHashMap<K, ValueWithTime>() {
+        private static final long serialVersionUID = 1L;
+          final private int MAX_ENTRIES = 50000;
+          @Override
+          protected boolean removeEldestEntry(
+            Map.Entry<K, ValueWithTime> eldest) {
+            return size() > MAX_ENTRIES;
+          }
+        };
+      this.cache = Collections.synchronizedMap(map);
+    }
+
+    // Note that this method may hold FSNamesystem.readLock() and it may
+    // be called inside FSNamesystem.writeLock(). If we make this method
+    // synchronized, it will deadlock.
+    abstract protected V getDirectly(K key) throws IOException;
+
+    public V get(K key) throws IOException {
+      // The method is not synchronized so we may get some stale value here but
+      // it's OK.
+      ValueWithTime result = cache.get(key);
+      long now = System.currentTimeMillis();
+      if (result != null &&
+          now - result.cachedTime < CACHE_TIMEOUT) {
+        return result.value;
+      }
+      result = new ValueWithTime();
+      result.value = getDirectly(key);
+      result.cachedTime = now;
+      cache.put(key, result);
+      return result.value;
+    }
+    private class ValueWithTime {
+      V value = null;
+      long cachedTime = 0L;
+    }
+  }
+
+  /**
+   * Get path for the corresponding source file for a valid parity
+   * file. Returns null if it does not exists
+   * @param parity the toUri path of the parity file
+   * @return the toUri path of the source file
+   */
+  String getSourceFile(String parity, String prefix) throws IOException {
+    if (isHarFile(parity)) {
+      return null;
+    }
+    // remove the prefix
+    String src = parity.substring(prefix.length());
+    if (namesystem.dir.getFileInfo(src, true) == null) {
+      return null;
+    }
+    return src;
+  }
+
+  /**
+   * Get path for the corresponding parity file for a source file.
+   * Returns null if it does not exists
+   * @param src the toUri path of the source file
+   * @return the toUri path of the parity file
+   */
+  String getParityFile(String src) throws IOException {
+    String xorParity = getParityFile(xorPrefix, src);
+    if (xorParity != null) {
+      return xorParity;
+    }
+    String rsParity = getParityFile(rsPrefix, src);
+    if (rsParity != null) {
+      return rsParity;
+    }
+    return null;
+  }
+
+  /**
+   * Get path for the parity file. Returns null if it does not exists
+   * @param parityPrefix usuall "/raid/" or "/raidrs/"
+   * @return the toUri path of the parity file
+   */
+  private String getParityFile(String parityPrefix, String src)
+      throws IOException {
+    String parity = parityPrefix + src;
+    if (namesystem.dir.getFileInfo(parity, true) == null) {
+      return null;
+    }
+    return parity;
+  }
+
+  private boolean isHarFile(String path) {
+    return path.lastIndexOf(RaidNode.HAR_SUFFIX) != -1;
+  }
+
+  private boolean isXorHarTempParityFile(String path) {
+    return path.startsWith(raidHarTempPrefix + Path.SEPARATOR);
+  }
+
+  private boolean isRsHarTempParityFile(String path) {
+    return path.startsWith(raidrsHarTempPrefix + Path.SEPARATOR);
+  }
+
+  private boolean isXorTempParityFile(String path) {
+    return path.startsWith(raidTempPrefix + Path.SEPARATOR);
+  }
+
+  private boolean isRsTempParityFile(String path) {
+    return path.startsWith(raidrsTempPrefix + Path.SEPARATOR);
+  }
+
+  private boolean isXorParityFile(String path) {
+    return path.startsWith(xorPrefix + Path.SEPARATOR);
+  }
+
+  private boolean isRsParityFile(String path) {
+    return path.startsWith(rsPrefix + Path.SEPARATOR);
+  }
+
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1044523&r1=1044522&r2=1044523&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Fri Dec 10 22:29:38 2010
@@ -1361,6 +1361,13 @@ public abstract class RaidNode implement
   /**
    * Return the temp path for XOR parity files
    */
+  public static String xorTempPrefix(Configuration conf) {
+    return conf.get(RAID_TMP_LOCATION_KEY, DEFAULT_RAID_TMP_LOCATION);
+  }
+
+  /**
+   * Return the temp path for XOR parity files
+   */
   public static String xorHarTempPrefix(Configuration conf) {
     return conf.get(RAID_HAR_TMP_LOCATION_KEY, DEFAULT_RAID_HAR_TMP_LOCATION);
   }

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java?rev=1044523&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
Fri Dec 10 22:29:38 2010
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.CachedFullPathNames;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.CachedLocatedBlocks;
+import org.apache.hadoop.raid.RaidNode;
+import org.junit.Test;
+
+public class TestBlockPlacementPolicyRaid {
+  private Configuration conf = null;
+  private MiniDFSCluster cluster = null;
+  private FSNamesystem namesystem = null;
+  private BlockPlacementPolicyRaid policy = null;
+  private FileSystem fs = null;
+  String[] rack1 = {"/rack1"};
+  String[] rack2 = {"/rack2"};
+  String[] host1 = {"host1.rack1.com"};
+  String[] host2 = {"host2.rack2.com"};
+  String xorPrefix = null;
+  String raidTempPrefix = null;
+  String raidrsTempPrefix = null;
+  String raidrsHarTempPrefix = null;
+
+  final static Log LOG =
+      LogFactory.getLog(TestBlockPlacementPolicyRaid.class);
+
+  protected void setupCluster() throws IOException {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
+    conf.set("dfs.replication.pending.timeout.sec", "2");
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L);
+    conf.set("dfs.block.replicator.classname",
+             "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid");
+    conf.set(RaidNode.STRIPE_LENGTH_KEY, "2");
+    conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3");
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
+    // start the cluster with one datanode first
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).
+        format(true).racks(rack1).hosts(host1).build();
+    cluster.waitActive();
+    namesystem = cluster.getNameNode().getNamesystem();
+    Assert.assertTrue("BlockPlacementPolicy type is not correct.",
+      namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid);
+    policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator;
+    fs = cluster.getFileSystem();
+    xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
+    raidTempPrefix = RaidNode.xorTempPrefix(conf);
+    raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
+    raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
+  }
+
+  /**
+   * Test that the parity files will be placed at the good locations when we
+   * create them.
+   */
+  @Test
+  public void testChooseTargetForRaidFile() throws IOException {
+    setupCluster();
+    try {
+      String src = "/dir/file";
+      String parity = raidrsTempPrefix + src;
+      DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L);
+      DFSTestUtil.waitReplication(fs, new Path(src), (short)1);
+      refreshPolicy();
+      setBlockPlacementPolicy(namesystem, policy);
+      // start 3 more datanodes
+      String[] racks = {"/rack2", "/rack2", "/rack2",
+                        "/rack2", "/rack2", "/rack2"};
+      String[] hosts =
+        {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
+         "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
+      cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
+      int numBlocks = 6;
+      DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
+      FileStatus srcStat = fs.getFileStatus(new Path(src));
+      BlockLocation[] srcLoc =
+        fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
+      FileStatus parityStat = fs.getFileStatus(new Path(parity));
+      BlockLocation[] parityLoc =
+          fs.getFileBlockLocations(parityStat, 0, parityStat.getLen());
+      int stripeLen = RaidNode.getStripeLength(conf);
+      for (int i = 0; i < numBlocks / stripeLen; i++) {
+        Set<String> locations = new HashSet<String>();
+        for (int j = 0; j < srcLoc.length; j++) {
+          String [] names = srcLoc[j].getNames();
+          for (int k = 0; k < names.length; k++) {
+            LOG.info("Source block location: " + names[k]);
+            locations.add(names[k]);
+          }
+        }
+        for (int j = 0 ; j < stripeLen; j++) {
+          String[] names = parityLoc[j + i * stripeLen].getNames();
+          for (int k = 0; k < names.length; k++) {
+            LOG.info("Parity block location: " + names[k]);
+            Assert.assertTrue(locations.add(names[k]));
+          }
+        }
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test that the har parity files will be placed at the good locations when we
+   * create them.
+   */
+  @Test
+  public void testChooseTargetForHarRaidFile() throws IOException {
+    setupCluster();
+    try {
+      String[] racks = {"/rack2", "/rack2", "/rack2",
+                        "/rack2", "/rack2", "/rack2"};
+      String[] hosts =
+        {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
+         "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
+      cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
+      String harParity = raidrsHarTempPrefix + "/dir/file";
+      int numBlocks = 11;
+      DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L);
+      DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1);
+      FileStatus stat = fs.getFileStatus(new Path(harParity));
+      BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen());
+      int rsParityLength = RaidNode.rsParityLength(conf);
+      for (int i = 0; i < numBlocks - rsParityLength; i++) {
+        Set<String> locations = new HashSet<String>();
+        for (int j = 0; j < rsParityLength; j++) {
+          for (int k = 0; k < loc[i + j].getNames().length; k++) {
+            // verify that every adjacent 4 blocks are on differnt nodes
+            String name = loc[i + j].getNames()[k];
+            LOG.info("Har Raid block location: " + name);
+            Assert.assertTrue(locations.add(name));
+          }
+        }
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test BlockPlacementPolicyRaid.CachedLocatedBlocks
+   * Verify that the results obtained from cache is the same as
+   * the results obtained directly
+   */
+  @Test
+  public void testCachedBlocks() throws IOException {
+    setupCluster();
+    try {
+      String file1 = "/dir/file1";
+      String file2 = "/dir/file2";
+      DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+      DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+      // test blocks cache
+      CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem);
+      verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+      verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+      verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+      verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+      try {
+        Thread.sleep(1200L);
+      } catch (InterruptedException e) {
+      }
+      verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+      verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test BlockPlacementPolicyRaid.CachedFullPathNames
+   * Verify that the results obtained from cache is the same as
+   * the results obtained directly
+   */
+  @Test
+  public void testCachedPathNames() throws IOException {
+    setupCluster();
+    try {
+      String file1 = "/dir/file1";
+      String file2 = "/dir/file2";
+      DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+      DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+      // test full path cache
+      CachedFullPathNames cachedFullPathNames =
+          new CachedFullPathNames(namesystem);
+      FSInodeInfo inode1 = null;
+      FSInodeInfo inode2 = null;
+      namesystem.dir.readLock();
+      try {
+        inode1 = namesystem.dir.rootDir.getNode(file1, true);
+        inode2 = namesystem.dir.rootDir.getNode(file2, true);
+      } finally {
+        namesystem.dir.readUnlock();
+      }
+      verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+      verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+      verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+      verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+      try {
+        Thread.sleep(1200L);
+      } catch (InterruptedException e) {
+      }
+      verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+      verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  /**
+   * Test the result of getCompanionBlocks() on the unraided files
+   */
+  @Test
+  public void testGetCompanionBLocks() throws IOException {
+    setupCluster();
+    try {
+      String file1 = "/dir/file1";
+      String file2 = "/raid/dir/file2";
+      String file3 = "/raidrs/dir/file3";
+      // Set the policy to default policy to place the block in the default way
+      setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
+          conf, namesystem, namesystem.clusterMap));
+      DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+      DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+      DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L);
+      Collection<LocatedBlock> companionBlocks;
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock());
+      Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock());
+      Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock());
+      Assert.assertEquals(1, companionBlocks.size());
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock());
+      Assert.assertEquals(1, companionBlocks.size());
+
+      int rsParityLength = RaidNode.rsParityLength(conf);
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock());
+      Assert.assertEquals(rsParityLength, companionBlocks.size());
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock());
+      Assert.assertEquals(rsParityLength, companionBlocks.size());
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock());
+      Assert.assertEquals(2, companionBlocks.size());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  static void setBlockPlacementPolicy(
+      FSNamesystem namesystem, BlockPlacementPolicy policy) {
+    namesystem.writeLock();
+    try {
+      namesystem.blockManager.replicator = policy;
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  /**
+   * Test BlockPlacementPolicyRaid actually deletes the correct replica.
+   * Start 2 datanodes and create 1 source file and its parity file.
+   * 1) Start host1, create the parity file with replication 1
+   * 2) Start host2, create the source file with replication 2
+   * 3) Set repliation of source file to 1
+   * Verify that the policy should delete the block with more companion blocks.
+   */
+  @Test
+  public void testDeleteReplica() throws IOException {
+    setupCluster();
+    try {
+      // Set the policy to default policy to place the block in the default way
+      setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
+          conf, namesystem, namesystem.clusterMap));
+      DatanodeDescriptor datanode1 =
+        namesystem.datanodeMap.values().iterator().next();
+      String source = "/dir/file";
+      String parity = xorPrefix + source;
+
+      final Path parityPath = new Path(parity);
+      DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L);
+      DFSTestUtil.waitReplication(fs, parityPath, (short)1);
+
+      // start one more datanode
+      cluster.startDataNodes(conf, 1, true, null, rack2, host2, null);
+      DatanodeDescriptor datanode2 = null;
+      for (DatanodeDescriptor d : namesystem.datanodeMap.values()) {
+        if (!d.getName().equals(datanode1.getName())) {
+          datanode2 = d;
+        }
+      }
+      Assert.assertTrue(datanode2 != null);
+      cluster.waitActive();
+      final Path sourcePath = new Path(source);
+      DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L);
+      DFSTestUtil.waitReplication(fs, sourcePath, (short)2);
+
+      refreshPolicy();
+      Assert.assertEquals(parity,
+                          policy.getParityFile(source));
+      Assert.assertEquals(source,
+                          policy.getSourceFile(parity, xorPrefix));
+
+      List<LocatedBlock> sourceBlocks = getBlocks(namesystem, source);
+      List<LocatedBlock> parityBlocks = getBlocks(namesystem, parity);
+      Assert.assertEquals(5, sourceBlocks.size());
+      Assert.assertEquals(3, parityBlocks.size());
+
+      // verify the result of getCompanionBlocks()
+      Collection<LocatedBlock> companionBlocks;
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, sourceBlocks.get(0).getBlock());
+      verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+                            new int[]{0, 1}, new int[]{0});
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, sourceBlocks.get(1).getBlock());
+      verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+                            new int[]{0, 1}, new int[]{0});
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, sourceBlocks.get(2).getBlock());
+      verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+                            new int[]{2, 3}, new int[]{1});
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, sourceBlocks.get(3).getBlock());
+      verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+                            new int[]{2, 3}, new int[]{1});
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, sourceBlocks.get(4).getBlock());
+      verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+                            new int[]{4}, new int[]{2});
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, parityBlocks.get(0).getBlock());
+      verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+                            new int[]{0, 1}, new int[]{0});
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, parityBlocks.get(1).getBlock());
+      verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+                            new int[]{2, 3}, new int[]{1});
+
+      companionBlocks = getCompanionBlocks(
+          namesystem, policy, parityBlocks.get(2).getBlock());
+      verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+                            new int[]{4}, new int[]{2});
+
+      // Set the policy back to raid policy. We have to create a new object
+      // here to clear the block location cache
+      refreshPolicy();
+      setBlockPlacementPolicy(namesystem, policy);
+      // verify policy deletes the correct blocks. companion blocks should be
+      // evenly distributed.
+      fs.setReplication(sourcePath, (short)1);
+      DFSTestUtil.waitReplication(fs, sourcePath, (short)1);
+      Map<String, Integer> counters = new HashMap<String, Integer>();
+      refreshPolicy();
+      for (int i = 0; i < parityBlocks.size(); i++) {
+        companionBlocks = getCompanionBlocks(
+            namesystem, policy, parityBlocks.get(i).getBlock());
+
+        counters = BlockPlacementPolicyRaid.countCompanionBlocks(
+            companionBlocks, false);
+        Assert.assertTrue(counters.get(datanode1.getName()) >= 1 &&
+                          counters.get(datanode1.getName()) <= 2);
+        Assert.assertTrue(counters.get(datanode1.getName()) +
+                          counters.get(datanode2.getName()) ==
+                          companionBlocks.size());
+
+        counters = BlockPlacementPolicyRaid.countCompanionBlocks(
+            companionBlocks, true);
+        Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 &&
+                          counters.get(datanode1.getParent().getName()) <= 2);
+        Assert.assertTrue(counters.get(datanode1.getParent().getName()) +
+                          counters.get(datanode2.getParent().getName()) ==
+                          companionBlocks.size());
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  // create a new BlockPlacementPolicyRaid to clear the cache
+  private void refreshPolicy() {
+      policy = new BlockPlacementPolicyRaid();
+      policy.initialize(conf, namesystem, namesystem.clusterMap);
+  }
+
+  private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
+      List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
+      int[] sourceBlockIndexes, int[] parityBlockIndexes) {
+    Set<Block> blockSet = new HashSet<Block>();
+    for (LocatedBlock b : companionBlocks) {
+      blockSet.add(b.getBlock());
+    }
+    Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length,
+                        blockSet.size());
+    for (int index : sourceBlockIndexes) {
+      Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock()));
+    }
+    for (int index : parityBlockIndexes) {
+      Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock()));
+    }
+  }
+
+  private void verifyCachedFullPathNameResult(
+      CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
+  throws IOException {
+    String res1 = inode.getFullPathName();
+    String res2 = cachedFullPathNames.get(inode);
+    LOG.info("Actual path name: " + res1);
+    LOG.info("Cached path name: " + res2);
+    Assert.assertEquals(cachedFullPathNames.get(inode),
+                        inode.getFullPathName());
+  }
+
+  private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
+      FSNamesystem namesystem, String file) throws IOException{
+    long len = namesystem.getFileInfo(file, true).getLen();
+    List<LocatedBlock> res1 = namesystem.getBlockLocations(
+        file, 0L, len, false, false).getLocatedBlocks();
+    List<LocatedBlock> res2 = cachedBlocks.get(file);
+    for (int i = 0; i < res1.size(); i++) {
+      LOG.info("Actual block: " + res1.get(i).getBlock());
+      LOG.info("Cached block: " + res2.get(i).getBlock());
+      Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock());
+    }
+  }
+
+  private Collection<LocatedBlock> getCompanionBlocks(
+      FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
+      Block block) throws IOException {
+    INodeFile inode = namesystem.blockManager.blocksMap.getINode(block);
+    return policy.getCompanionBlocks(inode.getFullPathName(), block);
+  }
+
+  private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file) 
+      throws IOException {
+    long len = namesystem.getFileInfo(file, true).getLen();
+    return namesystem.getBlockLocations(
+               file, 0, len, false, false).getLocatedBlocks();
+  }
+}



Mime
View raw message