hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1078081 - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/ src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/
Date Fri, 04 Mar 2011 18:06:48 GMT
Author: schen
Date: Fri Mar  4 18:06:47 2011
New Revision: 1078081

URL: http://svn.apache.org/viewvc?rev=1078081&view=rev
Log:
MAPREDUCE-2239. BlockPlacementPolicyRaid should call getBlockLocations only
when necessary. (schen)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1078081&r1=1078080&r2=1078081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Mar  4 18:06:47 2011
@@ -54,6 +54,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2351. mapred.job.tracker.history.completed.location should
     support an arbitrary filesystem URI. (tomwhite)
 
+    MAPREDUCE-2239. BlockPlacementPolicyRaid should call getBlockLocations
+    only when necessary. (schen)
+
   OPTIMIZATIONS
     
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java?rev=1078081&r1=1078080&r2=1078081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
Fri Mar  4 18:06:47 2011
@@ -100,9 +100,8 @@ public class BlockPlacementPolicyRaid ex
   DatanodeDescriptor[] chooseTarget(String srcPath, int numOfReplicas,
       DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes,
       long blocksize) {
-    HashMap<Node, Node> excluded = new HashMap<Node, Node>();
     return chooseTarget(srcPath, numOfReplicas, writer, chosenNodes,
-        excluded, blocksize);
+        null, blocksize);
   }
 
   @Override
@@ -111,21 +110,26 @@ public class BlockPlacementPolicyRaid ex
       boolean returnChosenNodes,
       HashMap<Node, Node> excludedNodes, long blocksize) {
     try {
+      FileType type = getFileType(srcPath);
+      if (type == FileType.NOT_RAID) {
+        return defaultPolicy.chooseTarget(
+            srcPath, numOfReplicas, writer, chosenNodes, blocksize);
+      }
       if (excludedNodes == null) {
         excludedNodes = new HashMap<Node, Node>();
       }
-      addExcludedNodes(srcPath, excludedNodes);
+      addExcludedNodes(srcPath, type, excludedNodes);
       DatanodeDescriptor[] result =
         defaultPolicy.chooseTarget(numOfReplicas, writer,
           chosenNodes, returnChosenNodes, excludedNodes, blocksize);
+      // Add the added block locations in the block locations cache.
+      // So the rest of the blocks know about these locations.
       cachedLocatedBlocks.get(srcPath).
           add(new LocatedBlock(new Block(), result));
       return result;
     } catch (Exception e) {
-      String trace = StringUtils.stringifyException(e);
-      System.out.println(trace);
-      FSNamesystem.LOG.debug(
-        "Error happend when choosing datanode to write.", e);
+      LOG.debug("Error happend when choosing datanode to write:" +
+        StringUtils.stringifyException(e));
       return defaultPolicy.chooseTarget(srcPath, numOfReplicas, writer,
                                 chosenNodes, blocksize);
     }
@@ -147,7 +151,13 @@ public class BlockPlacementPolicyRaid ex
     DatanodeDescriptor chosenNode = null;
     try {
       String path = cachedFullPathNames.get(inode);
-      List<LocatedBlock> companionBlocks = getCompanionBlocks(path, block);
+      FileType type = getFileType(path);
+      if (type == FileType.NOT_RAID) {
+        return defaultPolicy.chooseReplicaToDelete(
+            inode, block, replicationFactor, first, second);
+      }
+      List<LocatedBlock> companionBlocks =
+          getCompanionBlocks(path, type, block);
       if (companionBlocks == null || companionBlocks.size() == 0) {
         // Use the default method if it is not a valid raided or parity file
         return defaultPolicy.chooseReplicaToDelete(
@@ -165,7 +175,8 @@ public class BlockPlacementPolicyRaid ex
       return defaultPolicy.chooseReplicaToDelete(
           inode, block, replicationFactor, first, second);
     } catch (Exception e) {
-      LOG.debug("Failed to choose the correct replica to delete", e);
+      LOG.debug("Error happend when choosing replica to delete" +
+        StringUtils.stringifyException(e));
       return defaultPolicy.chooseReplicaToDelete(
           inode, block, replicationFactor, first, second);
     }
@@ -174,9 +185,13 @@ public class BlockPlacementPolicyRaid ex
   /**
    * Obtain the excluded nodes for the current block that is being written
    */
-  void addExcludedNodes(String file, HashMap<Node, Node> excluded)
+  void addExcludedNodes(String file, FileType type, HashMap<Node, Node> excluded)
       throws IOException {
-    for (LocatedBlock b : getCompanionBlocks(file)) {
+    Collection<LocatedBlock> blocks = getCompanionBlocks(file, type, null);
+    if (blocks == null) {
+      return;
+    }
+    for (LocatedBlock b : blocks) {
       for (Node n : b.getLocations()) {
         excluded.put(n, n);
       }
@@ -271,90 +286,52 @@ public class BlockPlacementPolicyRaid ex
   }
 
   /**
-   * Obtain the companion blocks of the block that is currently being written.
-   * Companion blocks are defined as the blocks that can help recover each
-   * others by using raid decoder.
-   * @param path the path of the file contains the block
-   * @return the block locations of companion blocks
-   */
-  List<LocatedBlock> getCompanionBlocks(String path)
-      throws IOException {
-    // This will be the index of the block which is currently being written
-    int blockIndex = cachedLocatedBlocks.get(path).size();
-    return getCompanionBlocks(path, blockIndex);
-  }
-
-  /**
    * Obtain the companion blocks of the give block
    * Companion blocks are defined as the blocks that can help recover each
    * others by using raid decoder.
-   * @param path the path of the file contains the block
-   * @param block the given block
+   * @param path The path of the file contains the block
+   * @param type The type of this file
+   * @param block The given block
+   *              null if it is the block which is currently being written to
    * @return the block locations of companion blocks
    */
-  List<LocatedBlock> getCompanionBlocks(String path, Block block)
-      throws IOException {
-    int blockIndex = getBlockIndex(path, block);
-    return getCompanionBlocks(path, blockIndex);
-  }
-
-  List<LocatedBlock> getCompanionBlocks(String path, int blockIndex)
+  List<LocatedBlock> getCompanionBlocks(String path, FileType type, Block block)
       throws IOException {
-    if (isXorHarTempParityFile(path)) {
-      // temp har xor parity file
-      return getCompanionBlocksForHarParityBlock(
-          path, xorParityLength, blockIndex);
-    }
-    if (isRsHarTempParityFile(path)) {
-      // temp har rs parity file
-      return getCompanionBlocksForHarParityBlock(
-          path, rsParityLength, blockIndex);
-    }
-    if (isXorTempParityFile(path)) {
-      // temp xor parity file
-      return getCompanionBlocksForParityBlock(
-          getSourceFile(path, raidTempPrefix), path,
-          xorParityLength, blockIndex);
-    }
-    if (isRsTempParityFile(path)) {
-      // temp rs parity file
-      return getCompanionBlocksForParityBlock(
-          getSourceFile(path, raidrsTempPrefix), path,
-          rsParityLength, blockIndex);
-    }
-    if (isXorParityFile(path)) {
-      // xor parity file
-      return getCompanionBlocksForParityBlock(getSourceFile(path, xorPrefix),
-          path, xorParityLength, blockIndex);
-    }
-    if (isRsParityFile(path)) {
-      // rs parity file
-      return getCompanionBlocksForParityBlock(getSourceFile(path, rsPrefix),
-          path, rsParityLength, blockIndex);
+    switch (type) {
+      case NOT_RAID:
+        return new ArrayList<LocatedBlock>();
+      case XOR_HAR_TEMP_PARITY:
+        return getCompanionBlocksForHarParityBlock(
+            path, xorParityLength, block);
+      case RS_HAR_TEMP_PARITY:
+        return getCompanionBlocksForHarParityBlock(
+            path, rsParityLength, block);
+      case XOR_TEMP_PARITY:
+        return getCompanionBlocksForParityBlock(
+            getSourceFile(path, raidTempPrefix), path, xorParityLength, block);
+      case RS_TEMP_PARITY:
+        return getCompanionBlocksForParityBlock(
+            getSourceFile(path, raidrsTempPrefix), path, rsParityLength, block);
+      case XOR_PARITY:
+        return getCompanionBlocksForParityBlock(getSourceFile(path, xorPrefix),
+            path, xorParityLength, block);
+      case RS_PARITY:
+        return getCompanionBlocksForParityBlock(getSourceFile(path, rsPrefix),
+            path, rsParityLength, block);
+      case XOR_SOURCE:
+        return getCompanionBlocksForSourceBlock(
+            path, getParityFile(path), xorParityLength, block);
+      case RS_SOURCE:
+        return getCompanionBlocksForSourceBlock(
+            path, getParityFile(path), xorParityLength, block);
     }
-    String parity = getParityFile(path);
-    if (parity == null) {
-      // corresponding parity file not found.
-      // return an empty list
-      return new ArrayList<LocatedBlock>();
-    }
-    if (isXorParityFile(parity)) {
-      // xor raided source file
-      return getCompanionBlocksForSourceBlock(
-          path, parity, xorParityLength, blockIndex);
-    }
-    if (isRsParityFile(parity)) {
-      // rs raided source file
-      return getCompanionBlocksForSourceBlock(
-          path, parity, rsParityLength, blockIndex);
-    }
-    // return an empty list
     return new ArrayList<LocatedBlock>();
   }
 
   private List<LocatedBlock> getCompanionBlocksForHarParityBlock(
-      String parity, int parityLength, int blockIndex)
+      String parity, int parityLength, Block block)
       throws IOException {
+    int blockIndex = getBlockIndex(parity, block);
     // consider only parity file in this case because source file block
     // location is not easy to obtain
     List<LocatedBlock> parityBlocks = cachedLocatedBlocks.get(parity);
@@ -362,14 +339,15 @@ public class BlockPlacementPolicyRaid ex
     synchronized (parityBlocks) {
       int start = Math.max(0, blockIndex - parityLength + 1);
       int end = Math.min(parityBlocks.size(), blockIndex + parityLength);
-      result = parityBlocks.subList(start, end);
+      result.addAll(parityBlocks.subList(start, end));
     }
     return result;
   }
 
   private List<LocatedBlock> getCompanionBlocksForParityBlock(
-      String src, String parity, int parityLength, int blockIndex)
+      String src, String parity, int parityLength, Block block)
       throws IOException {
+    int blockIndex = getBlockIndex(parity, block);
     List<LocatedBlock> result = new ArrayList<LocatedBlock>();
     List<LocatedBlock> parityBlocks = cachedLocatedBlocks.get(parity);
     int stripeIndex = blockIndex / parityLength;
@@ -399,8 +377,9 @@ public class BlockPlacementPolicyRaid ex
   }
 
   private List<LocatedBlock> getCompanionBlocksForSourceBlock(
-      String src, String parity, int parityLength, int blockIndex)
+      String src, String parity, int parityLength, Block block)
       throws IOException {
+    int blockIndex = getBlockIndex(src, block);
     List<LocatedBlock> result = new ArrayList<LocatedBlock>();
     List<LocatedBlock> sourceBlocks = cachedLocatedBlocks.get(src);
     int stripeIndex = blockIndex / stripeLength;
@@ -430,6 +409,11 @@ public class BlockPlacementPolicyRaid ex
   private int getBlockIndex(String file, Block block) throws IOException {
     List<LocatedBlock> blocks = cachedLocatedBlocks.get(file);
     synchronized (blocks) {
+      // null indicates that this block is currently added. Return size()
+      // as the index in this case
+	    if (block == null) {
+	      return blocks.size();
+	    }
       for (int i = 0; i < blocks.size(); i++) {
         if (blocks.get(i).getBlock().equals(block)) {
           return i;
@@ -600,29 +584,47 @@ public class BlockPlacementPolicyRaid ex
     return path.lastIndexOf(RaidNode.HAR_SUFFIX) != -1;
   }
 
-  private boolean isXorHarTempParityFile(String path) {
-    return path.startsWith(raidHarTempPrefix + Path.SEPARATOR);
-  }
-
-  private boolean isRsHarTempParityFile(String path) {
-    return path.startsWith(raidrsHarTempPrefix + Path.SEPARATOR);
-  }
-
-  private boolean isXorTempParityFile(String path) {
-    return path.startsWith(raidTempPrefix + Path.SEPARATOR);
-  }
-
-  private boolean isRsTempParityFile(String path) {
-    return path.startsWith(raidrsTempPrefix + Path.SEPARATOR);
-  }
-
-  private boolean isXorParityFile(String path) {
-    return path.startsWith(xorPrefix + Path.SEPARATOR);
-  }
-
-  private boolean isRsParityFile(String path) {
-    return path.startsWith(rsPrefix + Path.SEPARATOR);
+  enum FileType {
+    NOT_RAID,
+    XOR_HAR_TEMP_PARITY,
+    XOR_TEMP_PARITY,
+    XOR_PARITY,
+    XOR_SOURCE,
+    RS_HAR_TEMP_PARITY,
+    RS_TEMP_PARITY,
+    RS_PARITY,
+    RS_SOURCE,
+  }
+
+  FileType getFileType(String path) throws IOException {
+    if (path.startsWith(raidHarTempPrefix + Path.SEPARATOR)) {
+      return FileType.XOR_HAR_TEMP_PARITY;
+    }
+    if (path.startsWith(raidrsHarTempPrefix + Path.SEPARATOR)) {
+      return FileType.RS_HAR_TEMP_PARITY;
+    }
+    if (path.startsWith(raidTempPrefix + Path.SEPARATOR)) {
+      return FileType.XOR_TEMP_PARITY;
+    }
+    if (path.startsWith(raidrsTempPrefix + Path.SEPARATOR)) {
+      return FileType.RS_TEMP_PARITY;
+    }
+    if (path.startsWith(xorPrefix + Path.SEPARATOR)) {
+      return FileType.XOR_PARITY;
+    }
+    if (path.startsWith(rsPrefix + Path.SEPARATOR)) {
+      return FileType.RS_PARITY;
+    }
+    String parity = getParityFile(path);
+    if (parity == null) {
+      return FileType.NOT_RAID;
+    }
+    if (parity.startsWith(xorPrefix + Path.SEPARATOR)) {
+      return FileType.XOR_SOURCE;
+    }
+    if (parity.startsWith(rsPrefix + Path.SEPARATOR)) {
+      return FileType.RS_SOURCE;
+    }
+    return FileType.NOT_RAID;
   }
-
-
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java?rev=1078081&r1=1078080&r2=1078081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
Fri Mar  4 18:06:47 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.CachedFullPathNames;
 import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.CachedLocatedBlocks;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.FileType;
 import org.apache.hadoop.raid.RaidNode;
 import org.junit.Test;
 
@@ -111,6 +112,7 @@ public class TestBlockPlacementPolicyRai
       cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
       int numBlocks = 6;
       DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
+      DFSTestUtil.waitReplication(fs, new Path(parity), (short)2);
       FileStatus srcStat = fs.getFileStatus(new Path(src));
       BlockLocation[] srcLoc =
         fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
@@ -496,7 +498,8 @@ public class TestBlockPlacementPolicyRai
       FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
       Block block) throws IOException {
     INodeFile inode = namesystem.blockManager.blocksMap.getINode(block);
-    return policy.getCompanionBlocks(inode.getFullPathName(), block);
+    FileType type = policy.getFileType(inode.getFullPathName());
+    return policy.getCompanionBlocks(inode.getFullPathName(), type, block);
   }
 
   private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file) 



Mime
View raw message