hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1074727 [1/2] - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/security/token/block/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoop/hdfs/server/...
Date Fri, 25 Feb 2011 22:42:40 GMT
Author: suresh
Date: Fri Feb 25 22:42:39 2011
New Revision: 1074727

URL: http://svn.apache.org/viewvc?rev=1074727&view=rev
Log:
HDFS-1639. Add block pool management to FSDataset. Contributed by Suresh Srinivas.


Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Fri Feb 25 22:42:39 2011
@@ -39,6 +39,8 @@ Trunk (unreleased changes)
 
     HDFS-1647. Federation: Multiple namenode configuration. (jitendra)
 
+    HDFS-1639. Add block pool management to FSDataset. (suresh)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java Fri Feb 25 22:42:39 2011
@@ -49,16 +49,6 @@ public class ExtendedBlock implements Wr
     this(null, 0, 0, 0);
   }
 
-  // TODO:FEDERATION To remove when block pool ID related coding is complete
-  public ExtendedBlock(final Block b) {
-    this("TODO", b);
-  }
-  
-  // TODO:FEDERATION To remove when block pool ID related coding is complete
-  public ExtendedBlock(final long blkId) {
-    this("TODO", new Block(blkId));
-  }
-  
   public ExtendedBlock(final ExtendedBlock b) {
     this(b.poolId, b.block);
   }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Feb 25 22:42:39 2011
@@ -55,9 +55,8 @@ public class LocatedBlock implements Wri
     this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
   }
 
-  // TODO:FEDERATION To remove when block pool ID related coding is complete
-  public LocatedBlock(Block b, DatanodeInfo[] locs) {
-    this(new ExtendedBlock(b), locs, -1, false); // startOffset is unknown
+  public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
+    this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
   }
 
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Fri Feb 25 22:42:39 2011
@@ -195,7 +195,7 @@ public class BlockTokenSecretManager ext
    */
   public void checkAccess(Token<BlockTokenIdentifier> blockToken, String userId,
       Block block, AccessMode mode) throws InvalidToken {
-    checkAccess(blockToken, userId, new ExtendedBlock(block), mode);
+    checkAccess(blockToken, userId, new ExtendedBlock("TODO", block), mode);
   }
   
   /**

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Feb 25 22:42:39 2011
@@ -379,7 +379,7 @@ public class Balancer implements Tool {
       }
       // TODO:FEDERATION use ExtendedBlock in BalancerBlock
       DataTransferProtocol.Sender.opReplaceBlock(out,
-          new ExtendedBlock(block.getBlock()), source.getStorageID(), 
+          new ExtendedBlock("TODO", block.getBlock()), source.getStorageID(), 
           proxySource.getDatanode(), accessToken);
     }
     

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Feb 25 22:42:39 2011
@@ -94,7 +94,8 @@ class BlockSender implements java.io.Clo
     try {
       this.block = block;
       synchronized(datanode.data) { 
-        this.replica = datanode.data.getReplica(block.getBlockId());
+        this.replica = datanode.data.getReplica(block.getPoolId(), 
+            block.getBlockId());
         if (replica == null) {
           throw new ReplicaNotFoundException(block);
         }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Fri Feb 25 22:42:39 2011
@@ -213,8 +213,9 @@ class DataBlockScanner implements Runnab
 
   private void init() {
     
+    // TODO:FEDERATION block scanner must work one BP at a time
     // get the list of blocks and arrange them in random order
-    List<Block> arr = dataset.getFinalizedBlocks();
+    List<Block> arr = dataset.getFinalizedBlocks("TODO");
     Collections.shuffle(arr);
     
     blockInfoSet = new TreeSet<BlockScanInfo>();
@@ -231,6 +232,8 @@ class DataBlockScanner implements Runnab
     /* Pick the first directory that has any existing scanner log.
      * otherwise, pick the first directory.
      */
+    // TODO:FEDERATION currently picking only one block pool directory
+    // This needs to change to include all the block pool directories
     File dir = null;
     FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
     for(FSDataset.FSVolume vol : volumes) {
@@ -462,7 +465,7 @@ class DataBlockScanner implements Runnab
         updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
 
         // If the block does not exists anymore, then its not an error
-        if ( dataset.getFile(block.getLocalBlock()) == null ) {
+        if ( dataset.getFile(block.getPoolId(), block.getLocalBlock()) == null ) {
           LOG.info("Verification failed for " + block + ". Its ok since " +
           "it not in datanode dataset anymore.");
           deleteBlock(block.getPoolId(), block.getLocalBlock());
@@ -506,7 +509,7 @@ class DataBlockScanner implements Runnab
     
     if ( block != null ) {
       // TODO:FEDERATION blockInfoSet should use ExtendedBlock
-      verifyBlock(new ExtendedBlock(block));
+      verifyBlock(new ExtendedBlock("TODO", block));
     }
   }
   

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Feb 25 22:42:39 2011
@@ -581,7 +581,7 @@ public class DataNode extends Configured
         bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
       }
       initFsDataSet(conf, dataDirs);
-      //data.addStorage(blockPoolId, storage);
+      data.addBlockPool(blockPoolId, conf);
     }
 
     /**
@@ -681,9 +681,7 @@ public class DataNode extends Configured
         // and can be safely GC'ed.
         //
         long brStartTime = now();
-        BlockListAsLongs bReport = data.getBlockReport(/* TODO:FEDERATION pass blockPoolId*/);
-
-        // TODO:FEDERATION add support for pool ID
+        BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
         cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
             .getBlockListAsLongs());
         long brTime = now() - brStartTime;

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Fri Feb 25 22:42:39 2011
@@ -173,8 +173,9 @@ public class DirectoryScanner {
   void reconcile() {
     scan();
     for (ScanInfo info : diff) {
-      dataset.checkAndUpdate(info.getBlockId(), info.getBlockFile(), info
-          .getMetaFile(), info.getVolume());
+      // TODO:FEDERATION use right block pool Id
+      dataset.checkAndUpdate("TODO", info.getBlockId(), info.getBlockFile(),
+          info.getMetaFile(), info.getVolume());
     }
   }
 
@@ -188,7 +189,8 @@ public class DirectoryScanner {
 
     // Hold FSDataset lock to prevent further changes to the block map
     synchronized(dataset) {
-      Block[] memReport = dataset.getBlockList(false);
+      // TODO:FEDERATION use right block pool Id
+      Block[] memReport = dataset.getBlockList("TODO", false);
       Arrays.sort(memReport); // Sort based on blockId
 
       int d = 0; // index for diskReport

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Feb 25 22:42:39 2011
@@ -31,9 +31,13 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.Map.Entry;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -128,8 +132,8 @@ public class FSDataset implements FSCons
                           boolean resetIdx) throws IOException {
       if (numBlocks < maxBlocksPerDir) {
         File dest = new File(dir, b.getBlockName());
-        File metaData = getMetaFile( src, b );
-        File newmeta = getMetaFile(dest, b);
+        File metaData = getMetaFile(src, b.getGenerationStamp());
+        File newmeta = getMetaFile(dest, b.getGenerationStamp());
         if ( ! metaData.renameTo( newmeta ) ||
             ! src.renameTo( dest ) ) {
           throw new IOException( "could not move files for " + b +
@@ -180,16 +184,16 @@ public class FSDataset implements FSCons
       return children[ lastChildIdx ].addBlock(b, src, true, false); 
     }
 
-    void getVolumeMap(ReplicasMap volumeMap, FSVolume volume) 
+    void getVolumeMap(String bpid, ReplicasMap volumeMap, FSVolume volume) 
     throws IOException {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
-          children[i].getVolumeMap(volumeMap, volume);
+          children[i].getVolumeMap(bpid, volumeMap, volume);
         }
       }
 
       recoverTempUnlinkedBlock();
-      volume.addToReplicasMap(volumeMap, dir, true);
+      volume.addToReplicasMap(bpid, volumeMap, dir, true);
     }
         
     /**
@@ -298,18 +302,32 @@ public class FSDataset implements FSCons
     }
   }
 
-  class FSVolume {
-    private File currentDir;
-    private FSDir dataDir;      // directory store Finalized replica
-    private File rbwDir;        // directory store RBW replica
-    private File tmpDir;        // directory store Temporary replica
-    private DF usage;
-    private DU dfsUsage;
-    private long reserved;
-
+  /**
+   * Manages block pool directory under {@link FSVolume}
+   */
+  class BlockPool {
+    private final String bpid;
+    private final FSVolume volume; // volume to which this BlockPool belongs to
+    private final File currentDir; // StorageDirectory/current/bpid/current
+    private final FSDir finalizedDir; // directory store Finalized replica
+    private final File rbwDir; // directory store RBW replica
+    private final File tmpDir; // directory store Temporary replica
     
-    FSVolume(File currentDir, Configuration conf) throws IOException {
-      this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+    // TODO:FEDERATION scalability issue - a thread per DU is needed
+    private final DU dfsUsage;
+
+    /**
+     * 
+     * @param bpid Block pool Id
+     * @param volume {@link FSVolume} to which this BlockPool belongs to
+     * @param currentDir currentDir corresponding to the BlockPool
+     * @param conf
+     * @throws IOException
+     */
+    BlockPool(String bpid, FSVolume volume, File currentDir, Configuration conf)
+        throws IOException {
+      this.bpid = bpid;
+      this.volume = volume;
       this.currentDir = currentDir; 
       File parent = currentDir.getParentFile();
       final File finalizedDir = new File(
@@ -328,7 +346,7 @@ public class FSDataset implements FSCons
       if (rbwDir.exists() && !supportAppends) {
         FileUtil.fullyDelete(rbwDir);
       }
-      this.dataDir = new FSDir(finalizedDir);
+      this.finalizedDir = new FSDir(finalizedDir);
       if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
         if (!rbwDir.isDirectory()) {
           throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@@ -339,15 +357,26 @@ public class FSDataset implements FSCons
           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
         }
       }
-      this.usage = new DF(parent, conf);
       this.dfsUsage = new DU(parent, conf);
       this.dfsUsage.start();
     }
 
+    File getDirectory() {
+      return currentDir.getParentFile();
+    }
+    
     File getCurrentDir() {
       return currentDir;
     }
     
+    File getFinalizedDir() {
+      return finalizedDir.dir;
+    }
+    
+    File getRbwDir() {
+      return rbwDir;
+    }
+    
     void decDfsUsed(long value) {
       // The caller to this method (BlockFileDeleteTask.run()) does
       // not have locked FSDataset.this yet.
@@ -361,42 +390,12 @@ public class FSDataset implements FSCons
     }
     
     /**
-     * Calculate the capacity of the filesystem, after removing any
-     * reserved capacity.
-     * @return the unreserved number of bytes left in this filesystem. May be zero.
-     */
-    long getCapacity() throws IOException {
-      long remaining = usage.getCapacity() - reserved;
-      return remaining > 0 ? remaining : 0;
-    }
-      
-    long getAvailable() throws IOException {
-      long remaining = getCapacity()-getDfsUsed();
-      long available = usage.getAvailable();
-      if (remaining>available) {
-        remaining = available;
-      }
-      return (remaining > 0) ? remaining : 0;
-    }
-      
-    long getReserved(){
-      return reserved;
-    }
-    
-    String getMount() throws IOException {
-      return usage.getMount();
-    }
-      
-    File getDir() {
-      return dataDir.dir;
-    }
-    
-    /**
      * Temporary files. They get moved to the finalized block directory when
      * the block is finalized.
      */
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
+      DataNode.LOG.info("SURESH creating temporary file " + f);
       return FSDataset.createTmpFile(b, f);
     }
 
@@ -410,21 +409,21 @@ public class FSDataset implements FSCons
     }
 
     File addBlock(Block b, File f) throws IOException {
-      File blockFile = dataDir.addBlock(b, f);
-      File metaFile = getMetaFile( blockFile , b);
+      File blockFile = finalizedDir.addBlock(b, f);
+      File metaFile = getMetaFile(blockFile , b.getGenerationStamp());
       dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
       return blockFile;
     }
       
     void checkDirs() throws DiskErrorException {
-      dataDir.checkDirTree();
+      finalizedDir.checkDirTree();
       DiskChecker.checkDir(tmpDir);
       DiskChecker.checkDir(rbwDir);
     }
       
     void getVolumeMap(ReplicasMap volumeMap) throws IOException {
       // add finalized replicas
-      dataDir.getVolumeMap(volumeMap, this);
+      finalizedDir.getVolumeMap(bpid, volumeMap, volume);
       // add rbw replicas
       addToReplicasMap(volumeMap, rbwDir, false);
     }
@@ -448,14 +447,14 @@ public class FSDataset implements FSCons
         ReplicaInfo newReplica = null;
         if (isFinalized) {
           newReplica = new FinalizedReplica(blockId, 
-              blockFile.length(), genStamp, this, blockFile.getParentFile());
+              blockFile.length(), genStamp, volume, blockFile.getParentFile());
         } else {
           newReplica = new ReplicaWaitingToBeRecovered(blockId,
               validateIntegrity(blockFile, genStamp), 
-              genStamp, this, blockFile.getParentFile());
+              genStamp, volume, blockFile.getParentFile());
         }
 
-        ReplicaInfo oldReplica = volumeMap.add(newReplica);
+        ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
         if (oldReplica != null) {
           DataNode.LOG.warn("Two block files with the same block id exist " +
               "on disk: " + oldReplica.getBlockFile() +
@@ -533,11 +532,182 @@ public class FSDataset implements FSCons
     }
       
     void clearPath(File f) {
-      dataDir.clearPath(f);
+      finalizedDir.clearPath(f);
+    }
+      
+    public String toString() {
+      return currentDir.getAbsolutePath();
+    }
+    
+    public void shutdown() {
+      dfsUsage.shutdown();
+    }
+  }
+  
+  class FSVolume {
+    private final Map<String, BlockPool> map = new HashMap<String, BlockPool>();
+    private final File currentDir;    // <StorageDirectory>/current
+    private final DF usage;           
+    private final long reserved;
+    
+    FSVolume(File currentDir, Configuration conf) throws IOException {
+      this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+      this.currentDir = currentDir; 
+      File parent = currentDir.getParentFile();
+      this.usage = new DF(parent, conf);
+    }
+
+    /** Return storage directory corresponding to the volume */
+    public File getDir() {
+      return currentDir.getParentFile();
+    }
+    
+    public File getCurrentDir() {
+      return currentDir;
+    }
+    
+    public File getRbwDir(String bpid) throws IOException {
+      BlockPool bp = getBlockPool(bpid);
+      return bp.getRbwDir();
+    }
+    
+    void decDfsUsed(String bpid, long value) {
+      // The caller to this method (BlockFileDeleteTask.run()) does
+      // not have locked FSDataset.this yet.
+      synchronized(FSDataset.this) {
+        BlockPool bp = map.get(bpid);
+        if (bp != null) {
+          bp.decDfsUsed(value);
+        }
+      }
+    }
+    
+    long getDfsUsed() throws IOException {
+      // TODO valid synchronization
+      long dfsUsed = 0;
+      Set<Entry<String, BlockPool>> set = map.entrySet();
+      for (Entry<String, BlockPool> entry : set) {
+        dfsUsed += entry.getValue().getDfsUsed();
+      }
+      return dfsUsed;
+    }
+    
+    /**
+     * Calculate the capacity of the filesystem, after removing any
+     * reserved capacity.
+     * @return the unreserved number of bytes left in this filesystem. May be zero.
+     */
+    long getCapacity() throws IOException {
+      long remaining = usage.getCapacity() - reserved;
+      return remaining > 0 ? remaining : 0;
+    }
+      
+    long getAvailable() throws IOException {
+      long remaining = getCapacity()-getDfsUsed();
+      long available = usage.getAvailable();
+      if (remaining>available) {
+        remaining = available;
+      }
+      return (remaining > 0) ? remaining : 0;
+    }
+      
+    long getReserved(){
+      return reserved;
+    }
+    
+    String getMount() throws IOException {
+      return usage.getMount();
+    }
+    
+    private BlockPool getBlockPool(String bpid) throws IOException {
+      BlockPool bp = map.get(bpid);
+      if (bp == null) {
+        // TODO:FEDERATION cleanup this exception
+        throw new IOException("block pool " + bpid + " not found");
+      }
+      return bp;
+    }
+    
+    /**
+     * Temporary files. They get moved to the finalized block directory when
+     * the block is finalized.
+     */
+    File createTmpFile(String bpid, Block b) throws IOException {
+      BlockPool bp = getBlockPool(bpid);
+      return bp.createTmpFile(b);
+    }
+
+    /**
+     * RBW files. They get moved to the finalized block directory when
+     * the block is finalized.
+     */
+    File createRbwFile(String bpid, Block b) throws IOException {
+      BlockPool bp = getBlockPool(bpid);
+      return bp.createRbwFile(b);
+    }
+
+    File addBlock(String bpid, Block b, File f) throws IOException {
+      BlockPool bp = getBlockPool(bpid);
+      return bp.addBlock(b, f);
+    }
+      
+    void checkDirs() throws DiskErrorException {
+      // TODO:FEDERATION valid synchronization
+      Set<Entry<String, BlockPool>> set = map.entrySet();
+      for (Entry<String, BlockPool> entry : set) {
+        entry.getValue().checkDirs();
+      }
+    }
+      
+    void getVolumeMap(ReplicasMap volumeMap) throws IOException {
+      Set<Entry<String, BlockPool>> set = map.entrySet();
+      for (Entry<String, BlockPool> entry : set) {
+        entry.getValue().getVolumeMap(volumeMap);
+      }
+    }
+    
+    void getVolumeMap(String bpid, ReplicasMap volumeMap) throws IOException {
+      BlockPool bp = getBlockPool(bpid);
+      bp.getVolumeMap(volumeMap);
+    }
+    
+    /**
+     * Add replicas under the given directory to the volume map
+     * @param volumeMap the replicas map
+     * @param dir an input directory
+     * @param isFinalized true if the directory has finalized replicas;
+     *                    false if the directory has rbw replicas
+     * @throws IOException 
+     */
+    private void addToReplicasMap(String bpid, ReplicasMap volumeMap, 
+        File dir, boolean isFinalized) throws IOException {
+      BlockPool bp = getBlockPool(bpid);
+      // TODO move this up
+      // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
+      bp.addToReplicasMap(volumeMap, dir, isFinalized);
+    }
+    
+    void clearPath(String bpid, File f) throws IOException {
+      BlockPool bp = getBlockPool(bpid);
+      bp.clearPath(f);
     }
       
     public String toString() {
-      return getDir().getAbsolutePath();
+      return currentDir.getAbsolutePath();
+    }
+
+    public void shutdown() {
+      Set<Entry<String, BlockPool>> set = map.entrySet();
+      for (Entry<String, BlockPool> entry : set) {
+        entry.getValue().shutdown();
+      }
+    }
+
+    public void addBlockPool(String bpid, Configuration conf)
+        throws IOException {
+      File bpdir = new File(currentDir, bpid);
+      BlockPool bp = new BlockPool(bpid, this, bpdir, conf);
+      map.put(bpid, bp);
     }
   }
     
@@ -606,6 +776,13 @@ public class FSDataset implements FSCons
         volumes[idx].getVolumeMap(volumeMap);
       }
     }
+    
+    synchronized void getVolumeMap(String bpid, ReplicasMap volumeMap)
+        throws IOException {
+      for (int idx = 0; idx < volumes.length; idx++) {
+        volumes[idx].getVolumeMap(bpid, volumeMap);
+      }
+    }
       
     /**
      * goes over all the volumes and checkDir eachone of them
@@ -667,6 +844,13 @@ public class FSDataset implements FSCons
       }
       return false;
     }
+
+    public void addBlockPool(String bpid, Configuration conf)
+        throws IOException {
+      for (FSVolume v : volumes) {
+        v.addBlockPool(bpid, conf);
+      }
+    }
   }
   
   //////////////////////////////////////////////////////
@@ -699,12 +883,12 @@ public class FSDataset implements FSCons
     return blockFileName + "_" + genStamp + METADATA_EXTENSION;
   }
   
-  static File getMetaFile(File f , Block b) {
-    return new File(getMetaFileName(f.getAbsolutePath(),
-                                    b.getGenerationStamp())); 
+  static File getMetaFile(File f , long genStamp) {
+    return new File(getMetaFileName(f.getAbsolutePath(), genStamp));
   }
-  protected File getMetaFile(Block b) throws IOException {
-    return getMetaFile(getBlockFile(b), b);
+  
+  protected File getMetaFile(ExtendedBlock b) throws IOException {
+    return getMetaFile(getBlockFile(b), b.getGenerationStamp());
   }
 
   /** Find the metadata file for the specified block file.
@@ -763,15 +947,15 @@ public class FSDataset implements FSCons
   }
 
   /** Return the block file for the given ID */ 
-  public File findBlockFile(long blockId) {
-    return getFile(blockId);
+  public File findBlockFile(String bpid, long blockId) {
+    return getFile(bpid, blockId);
   }
 
   @Override // FSDatasetInterface
   public synchronized Block getStoredBlock(String bpid, long blkid)
       throws IOException {
     // TODO:FEDERATION use extended block
-    File blockfile = findBlockFile(blkid);
+    File blockfile = findBlockFile(bpid, blkid);
     if (blockfile == null) {
       return null;
     }
@@ -786,8 +970,8 @@ public class FSDataset implements FSCons
    * @param blockId
    * @return
    */
-  synchronized ReplicaInfo fetchReplicaInfo(long blockId) {
-    ReplicaInfo r = volumeMap.get(blockId);
+  synchronized ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
+    ReplicaInfo r = volumeMap.get(bpid, blockId);
     if(r == null)
       return null;
     switch(r.getState()) {
@@ -808,19 +992,19 @@ public class FSDataset implements FSCons
   @Override // FSDatasetInterface
   public boolean metaFileExists(ExtendedBlock b) throws IOException {
     // TODO:FEDERATION use ExtendedBlock
-    return getMetaFile(b.getLocalBlock()).exists();
+    return getMetaFile(b).exists();
   }
   
   @Override // FSDatasetInterface
   public long getMetaDataLength(ExtendedBlock b) throws IOException {
-    File checksumFile = getMetaFile(b.getLocalBlock());
+    File checksumFile = getMetaFile(b);
     return checksumFile.length();
   }
 
   @Override // FSDatasetInterface
   public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
       throws IOException {
-    File checksumFile = getMetaFile(b.getLocalBlock());
+    File checksumFile = getMetaFile(b);
     return new MetaDataInputStream(new FileInputStream(checksumFile),
                                                     checksumFile.length());
   }
@@ -933,14 +1117,22 @@ public class FSDataset implements FSCons
    */
   @Override // FSDatasetInterface
   public long getLength(ExtendedBlock b) throws IOException {
-    return getBlockFile(b.getLocalBlock()).length();
+    return getBlockFile(b).length();
   }
 
   /**
    * Get File name for a given block.
    */
-  public synchronized File getBlockFile(Block b) throws IOException {
-    File f = validateBlockFile(b);
+  public File getBlockFile(ExtendedBlock b) throws IOException {
+    return getBlockFile(b.getPoolId(), b.getLocalBlock());
+  }
+  
+  /**
+   * Get File name for a given block.
+   */
+  public synchronized File getBlockFile(String bpid, Block b)
+      throws IOException {
+    File f = validateBlockFile(bpid, b);
     if(f == null) {
       if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
         InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
@@ -953,13 +1145,13 @@ public class FSDataset implements FSCons
   @Override // FSDatasetInterface
   public synchronized InputStream getBlockInputStream(ExtendedBlock b)
       throws IOException {
-    return new FileInputStream(getBlockFile(b.getLocalBlock()));
+    return new FileInputStream(getBlockFile(b));
   }
 
   @Override // FSDatasetInterface
   public synchronized InputStream getBlockInputStream(ExtendedBlock b,
       long seekOffset) throws IOException {
-    File blockFile = getBlockFile(b.getLocalBlock());
+    File blockFile = getBlockFile(b);
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
     if (seekOffset > 0) {
       blockInFile.seek(seekOffset);
@@ -971,11 +1163,20 @@ public class FSDataset implements FSCons
    * Get the meta info of a block stored in volumeMap
    * @param b block
    * @return the meta replica information
+   */
+  private ReplicaInfo getReplicaInfo(ExtendedBlock b) {
+    return volumeMap.get(b.getPoolId(), b.getLocalBlock());
+  }
+  
+  /**
+   * Get the meta info of a block stored in volumeMap
+   * @param b block
+   * @return the meta replica information
    * @throws IOException if no entry is in the map or 
    *                        there is a generation stamp mismatch
    */
-  private ReplicaInfo getReplicaInfo(Block b) throws IOException {
-    ReplicaInfo info = volumeMap.get(b);
+  private ReplicaInfo getReplicaInfo(String bpid, Block b) throws IOException {
+    ReplicaInfo info = volumeMap.get(bpid, b);
     if (info == null) {
       throw new IOException("Block " + b + " does not exist in volumeMap.");
     }
@@ -988,7 +1189,7 @@ public class FSDataset implements FSCons
   @Override // FSDatasetInterface
   public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b, 
                           long blkOffset, long ckoff) throws IOException {
-    ReplicaInfo info = getReplicaInfo(b.getLocalBlock());
+    ReplicaInfo info = getReplicaInfo(b);
     File blockFile = info.getBlockFile();
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
     if (blkOffset > 0) {
@@ -1013,7 +1214,7 @@ public class FSDataset implements FSCons
    * @return - true if the specified block was unlinked or the block
    *           is not in any snapshot.
    */
-  public boolean unlinkBlock(Block block, int numLinks) throws IOException {
+  public boolean unlinkBlock(ExtendedBlock block, int numLinks) throws IOException {
     ReplicaInfo info = null;
 
     synchronized (this) {
@@ -1102,7 +1303,7 @@ public class FSDataset implements FSCons
           " should be greater than the replica " + b + "'s generation stamp");
     }
     // TODO:FEDERATION use ExtendedBlock
-    ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo == null) {
       throw new ReplicaNotFoundException(
           ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
@@ -1118,13 +1319,15 @@ public class FSDataset implements FSCons
           " expected length is " + expectedBlockLen);
     }
 
-    return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+    return append(b.getPoolId(), (FinalizedReplica)replicaInfo, newGS,
+        b.getNumBytes());
   }
   
   /** Append to a finalized replica
    * Change a finalized replica to be a RBW replica and 
    * bump its generation stamp to be the newGS
    * 
+   * @param bpid block pool Id
    * @param replicaInfo a finalized replica
    * @param newGS new generation stamp
    * @param estimateBlockLen estimate generation stamp
@@ -1132,8 +1335,9 @@ public class FSDataset implements FSCons
    * @throws IOException if moving the replica from finalized directory 
    *         to rbw directory fails
    */
-  private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo, 
-      long newGS, long estimateBlockLen) throws IOException {
+  private synchronized ReplicaBeingWritten append(String bpid,
+      FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
+      throws IOException {
     // unlink the finalized replica
     replicaInfo.unlinkBlock(1);
     
@@ -1144,7 +1348,7 @@ public class FSDataset implements FSCons
       throw new DiskOutOfSpaceException("Insufficient space for appending to "
           + replicaInfo);
     }
-    File newBlkFile = new File(v.rbwDir, replicaInfo.getBlockName());
+    File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
     File oldmeta = replicaInfo.getMetaFile();
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
         replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
@@ -1177,14 +1381,14 @@ public class FSDataset implements FSCons
     }
     
     // Replace finalized replica by a RBW replica in replicas map
-    volumeMap.add(newReplicaInfo);
+    volumeMap.add(bpid, newReplicaInfo);
     
     return newReplicaInfo;
   }
 
-  private ReplicaInfo recoverCheck(Block b, long newGS, 
+  private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 
       long expectedBlockLen) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo == null) {
       throw new ReplicaNotFoundException(
           ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
@@ -1239,12 +1443,12 @@ public class FSDataset implements FSCons
     DataNode.LOG.info("Recover failed append to " + b);
 
     // TODO:FEDERATION use ExtendedBlock
-    ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS,
-        expectedBlockLen);
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
 
     // change the replica's state/gs etc.
     if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
-      return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+      return append(b.getPoolId(), (FinalizedReplica) replicaInfo, newGS, 
+          b.getNumBytes());
     } else { //RBW
       bumpReplicaGS(replicaInfo, newGS);
       return (ReplicaBeingWritten)replicaInfo;
@@ -1256,13 +1460,13 @@ public class FSDataset implements FSCons
       long expectedBlockLen) throws IOException {
     DataNode.LOG.info("Recover failed close " + b);
     // check replica's state
-    ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS,
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS,
         expectedBlockLen);
     // bump the replica's GS
     bumpReplicaGS(replicaInfo, newGS);
     // finalize the replica if RBW
     if (replicaInfo.getState() == ReplicaState.RBW) {
-      finalizeReplica(replicaInfo);
+      finalizeReplica(b.getPoolId(), replicaInfo);
     }
   }
   
@@ -1296,7 +1500,7 @@ public class FSDataset implements FSCons
   @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
       throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    ReplicaInfo replicaInfo = volumeMap.get(b.getPoolId(), b.getBlockId());
     if (replicaInfo != null) {
       throw new ReplicaAlreadyExistsException("Block " + b +
       " already exists in state " + replicaInfo.getState() +
@@ -1305,10 +1509,10 @@ public class FSDataset implements FSCons
     // create a new block
     FSVolume v = volumes.getNextVolume(b.getNumBytes());
     // create a rbw file to hold block in the designated volume
-    File f = v.createRbwFile(b.getLocalBlock());
+    File f = v.createRbwFile(b.getPoolId(), b.getLocalBlock());
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile());
-    volumeMap.add(newReplicaInfo);
+    volumeMap.add(b.getPoolId(), newReplicaInfo);
     return newReplicaInfo;
   }
   
@@ -1318,7 +1522,7 @@ public class FSDataset implements FSCons
       throws IOException {
     DataNode.LOG.info("Recover the RBW replica " + b);
 
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    ReplicaInfo replicaInfo = volumeMap.get(b.getPoolId(), b.getBlockId());
     if (replicaInfo == null) {
       throw new ReplicaNotFoundException(
           ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
@@ -1364,7 +1568,7 @@ public class FSDataset implements FSCons
   @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
       throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    ReplicaInfo replicaInfo = volumeMap.get(b.getPoolId(), b.getBlockId());
     if (replicaInfo != null) {
       throw new ReplicaAlreadyExistsException("Block " + b +
           " already exists in state " + replicaInfo.getState() +
@@ -1373,10 +1577,10 @@ public class FSDataset implements FSCons
     
     FSVolume v = volumes.getNextVolume(b.getNumBytes());
     // create a temporary file to hold block in the designated volume
-    File f = v.createTmpFile(b.getLocalBlock());
+    File f = v.createTmpFile(b.getPoolId(), b.getLocalBlock());
     ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile());
-    volumeMap.add(newReplicaInfo);
+    volumeMap.add(b.getPoolId(), newReplicaInfo);
     
     return newReplicaInfo;
   }
@@ -1397,14 +1601,14 @@ public class FSDataset implements FSCons
     channel.position(newPos);
   }
 
-  synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
+  synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws IOException {
     if ( vol == null ) {
-      vol = getReplicaInfo( blk ).getVolume();
+      vol = getReplicaInfo(bpid, blk).getVolume();
       if ( vol == null ) {
         throw new IOException("Could not find volume for block " + blk);
       }
     }
-    return vol.createTmpFile(blk);
+    return vol.createTmpFile(bpid, blk);
   }
 
   //
@@ -1420,17 +1624,17 @@ public class FSDataset implements FSCons
    */
   @Override // FSDatasetInterface
   public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
-    ReplicaInfo replicaInfo = getReplicaInfo(b.getLocalBlock());
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo.getState() == ReplicaState.FINALIZED) {
       // this is legal, when recovery happens on a file that has
       // been opened for append but never modified
       return;
     }
-    finalizeReplica(replicaInfo);
+    finalizeReplica(b.getPoolId(), replicaInfo);
   }
   
-  private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
-  throws IOException {
+  private synchronized FinalizedReplica finalizeReplica(String bpid,
+      ReplicaInfo replicaInfo) throws IOException {
     FinalizedReplica newReplicaInfo = null;
     if (replicaInfo.getState() == ReplicaState.RUR &&
        ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() == 
@@ -1445,10 +1649,10 @@ public class FSDataset implements FSCons
             " for block " + replicaInfo);
       }
 
-      File dest = v.addBlock(replicaInfo, f);
+      File dest = v.addBlock(bpid, replicaInfo, f);
       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
     }
-    volumeMap.add(newReplicaInfo);
+    volumeMap.add(bpid, newReplicaInfo);
     return newReplicaInfo;
   }
 
@@ -1457,10 +1661,10 @@ public class FSDataset implements FSCons
    */
   @Override // FSDatasetInterface
   public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
       // remove from volumeMap
-      volumeMap.remove(b.getLocalBlock());
+      volumeMap.remove(b.getPoolId(), b.getLocalBlock());
       
       // delete the on-disk temp file
       if (delBlockFromDisk(replicaInfo.getBlockFile(), 
@@ -1500,12 +1704,17 @@ public class FSDataset implements FSCons
    * Generates a block report from the in-memory block map.
    */
   @Override // FSDatasetInterface
-  public BlockListAsLongs getBlockReport() {
-    ArrayList<ReplicaInfo> finalized =
-      new ArrayList<ReplicaInfo>(volumeMap.size());
+  public BlockListAsLongs getBlockReport(String bpid) {
+    // TODO:FEDERATION volumeMap.size() has not been synchronized - old code
+    int size =  volumeMap.size(bpid);
+    ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size);
     ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
+    if (size == 0) {
+      return new BlockListAsLongs(finalized, uc);
+    }
+    
     synchronized(this) {
-      for (ReplicaInfo b : volumeMap.replicas()) {
+      for (ReplicaInfo b : volumeMap.replicas(bpid)) {
         switch(b.getState()) {
         case FINALIZED:
           finalized.add(b);
@@ -1529,13 +1738,15 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * Get the block list from in-memory blockmap. Note if <deepcopy>
-   * is false, reference to the block in the volumeMap is returned. This block
-   * should not be changed. Suitable synchronization using {@link FSDataset}
-   * is needed to handle concurrent modification to the block.
-   */
-  synchronized Block[] getBlockList(boolean deepcopy) {
-    Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
+   * Get the block list from in-memory blockmap for a block pool.
+   * 
+   * Note if <deepcopy> is false, reference to the block in the volumeMap is
+   * returned. This block should not be changed. Suitable synchronization using
+   * {@link FSDataset} is needed to handle concurrent modification to the block.
+   */
+  synchronized Block[] getBlockList(String bpid, boolean deepcopy) {
+    Block[] list = volumeMap.replicas(bpid).toArray(
+        new Block[volumeMap.size(bpid)]);
     if (deepcopy) {
       for (int i = 0; i < list.length; i++) {
         list[i] = new Block(list[i]);
@@ -1545,11 +1756,11 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * Get the list of finalized blocks from in-memory blockmap.
+   * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
-  synchronized List<Block> getFinalizedBlocks() {
-    ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size());
-    for (ReplicaInfo b : volumeMap.replicas()) {
+  synchronized List<Block> getFinalizedBlocks(String bpid) {
+    ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size(bpid));
+    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
       if(b.getState() == ReplicaState.FINALIZED) {
         finalized.add(new Block(b));
       }
@@ -1563,7 +1774,7 @@ public class FSDataset implements FSCons
    */
   @Override // FSDatasetInterface
   public boolean isValidBlock(ExtendedBlock b) {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo == null || 
         replicaInfo.getState() != ReplicaState.FINALIZED) {
       return false;
@@ -1574,9 +1785,9 @@ public class FSDataset implements FSCons
   /**
    * Find the file corresponding to the block and return it if it exists.
    */
-  File validateBlockFile(Block b) throws IOException {
+  File validateBlockFile(String bpid, Block b) throws IOException {
     //Should we check for metadata file too?
-    File f = getFile(b);
+    File f = getFile(bpid, b);
     
     if(f != null ) {
       if(f.exists())
@@ -1606,7 +1817,7 @@ public class FSDataset implements FSCons
     }
 
     //check replica's meta file
-    final File metafile = getMetaFile(f, r);
+    final File metafile = getMetaFile(f, r.getGenerationStamp());
     if (!metafile.exists()) {
       throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
     }
@@ -1627,8 +1838,8 @@ public class FSDataset implements FSCons
       File f = null;
       FSVolume v;
       synchronized (this) {
-        f = getFile(invalidBlks[i]);
-        ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
+        f = getFile(bpid, invalidBlks[i]);
+        ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
         if (dinfo == null || 
             dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
           DataNode.LOG.warn("Unexpected error trying to delete block "
@@ -1667,15 +1878,16 @@ public class FSDataset implements FSCons
             (replicaState == ReplicaState.RUR && 
                 ((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() == 
                   ReplicaState.FINALIZED)) {
-          v.clearPath(parent);
+          v.clearPath(bpid, parent);
         }
-        volumeMap.remove(invalidBlks[i]);
+        volumeMap.remove(bpid, invalidBlks[i]);
       }
-      File metaFile = getMetaFile( f, invalidBlks[i] );
+      File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp());
       long dfsBytes = f.length() + metaFile.length();
       
       // Delete the block asynchronously to make sure we can do it fast enough
-      asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes, invalidBlks[i].toString());
+      asyncDiskService.deleteAsync(v, bpid, f, metaFile, dfsBytes,
+          invalidBlks[i].toString());
     }
     if (error) {
       throw new IOException("Error in deleting blocks.");
@@ -1685,17 +1897,18 @@ public class FSDataset implements FSCons
   /**
    * Turn the block identifier into a filename; ignore generation stamp!!!
    */
-  public synchronized File getFile(Block b) {
-    return getFile(b.getBlockId());
+  public synchronized File getFile(String bpid, Block b) {
+    return getFile(bpid, b.getBlockId());
   }
 
   /**
    * Turn the block identifier into a filename
+   * @param bpid Block pool Id
    * @param blockId a block's id
    * @return on disk data file path; null if the replica does not exist
    */
-  private File getFile(long blockId) {
-    ReplicaInfo info = volumeMap.get(blockId);
+  private File getFile(String bpid, long blockId) {
+    ReplicaInfo info = volumeMap.get(bpid, blockId);
     if (info != null) {
       return info.getBlockFile();
     }
@@ -1720,19 +1933,19 @@ public class FSDataset implements FSCons
     // remove related blocks
     long mlsec = System.currentTimeMillis();
     synchronized (this) {
-      Iterator<ReplicaInfo> ib = volumeMap.replicas().iterator();
-      while(ib.hasNext()) {
-        ReplicaInfo b = ib.next();
-        total_blocks ++;
-        // check if the volume block belongs to still valid
-        FSVolume vol = b.getVolume();
-        for(FSVolume fv: failed_vols) {
-          if(vol == fv) {
-            DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol " 
-                + vol.dataDir.dir.getAbsolutePath());
-            ib.remove();
-            removed_blocks++;
-            break;
+      for(FSVolume fv: failed_vols) {
+        for (String bpid : fv.map.keySet()) {
+          Iterator<ReplicaInfo> ib = volumeMap.replicas(bpid).iterator();
+          while(ib.hasNext()) {
+            ReplicaInfo b = ib.next();
+            total_blocks ++;
+            // check if the volume block belongs to still valid
+            if(b.getVolume() == fv) {
+              DataNode.LOG.warn("removing " + bpid + ":" + b.getBlockId()
+                  + " from vol " + fv.currentDir.getAbsolutePath());
+              ib.remove();
+              removed_blocks++;
+            }
           }
         }
       }
@@ -1744,7 +1957,7 @@ public class FSDataset implements FSCons
     // report the error
     StringBuilder sb = new StringBuilder();
     for(FSVolume fv : failed_vols) {
-      sb.append(fv.dataDir.dir.getAbsolutePath() + ";");
+      sb.append(fv.currentDir.getAbsolutePath() + ";");
     }
 
     throw  new DiskErrorException("DataNode failed volumes:" + sb);
@@ -1797,7 +2010,7 @@ public class FSDataset implements FSCons
     if(volumes != null) {
       for (FSVolume volume : volumes.volumes) {
         if(volume != null) {
-          volume.dfsUsage.shutdown();
+          volume.shutdown();
         }
       }
     }
@@ -1830,13 +2043,13 @@ public class FSDataset implements FSCons
    * @param diskMetaFile Metadata file from on the disk
    * @param vol Volume of the block file
    */
-  public void checkAndUpdate(long blockId, File diskFile,
+  public void checkAndUpdate(String bpid, long blockId, File diskFile,
       File diskMetaFile, FSVolume vol) {
     DataNode datanode = DataNode.getDataNode();
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
     synchronized (this) {
-      memBlockInfo = volumeMap.get(blockId);
+      memBlockInfo = volumeMap.get(bpid, blockId);
       if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
         return;
@@ -1860,7 +2073,7 @@ public class FSDataset implements FSCons
         if (!memBlockInfo.getBlockFile().exists()) {
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
-          volumeMap.remove(blockId);
+          volumeMap.remove(bpid, blockId);
           if (datanode.blockScanner != null) {
             // TODO:FEDERATION pass the right bpid
             datanode.blockScanner.deleteBlock("TODO", new Block(blockId));
@@ -1883,9 +2096,9 @@ public class FSDataset implements FSCons
         // Block is missing in memory - add the block to volumeMap
         ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
             diskFile.length(), diskGS, vol, diskFile.getParentFile());
-        volumeMap.add(diskBlockInfo);
+        volumeMap.add(bpid, diskBlockInfo);
         if (datanode.blockScanner != null) {
-          datanode.blockScanner.addBlock(new ExtendedBlock(diskBlockInfo));
+          datanode.blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
         }
         DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
         return;
@@ -1921,7 +2134,8 @@ public class FSDataset implements FSCons
 
       // Compare generation stamp
       if (memBlockInfo.getGenerationStamp() != diskGS) {
-        File memMetaFile = getMetaFile(diskFile, memBlockInfo);
+        File memMetaFile = getMetaFile(diskFile, 
+            memBlockInfo.getGenerationStamp());
         if (memMetaFile.exists()) {
           if (memMetaFile.compareTo(diskMetaFile) != 0) {
             DataNode.LOG.warn("Metadata file in memory "
@@ -1957,7 +2171,7 @@ public class FSDataset implements FSCons
     // Send corrupt block report outside the lock
     if (corruptBlock != null) {
       DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
-      LocatedBlock[] blocks = { new LocatedBlock(corruptBlock, dnArr) };
+      LocatedBlock[] blocks = { new LocatedBlock(bpid, corruptBlock, dnArr) };
       try {
         datanode.namenode.reportBadBlocks(blocks);
         DataNode.LOG.warn("Reporting the block " + corruptBlock
@@ -1970,26 +2184,26 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * @deprecated use {@link #fetchReplicaInfo(long)} instead.
+   * @deprecated use {@link #fetchReplicaInfo(String, long)} instead.
    */
   @Override // FSDatasetInterface
   @Deprecated
-  public ReplicaInfo getReplica(long blockId) {
+  public ReplicaInfo getReplica(String bpid, long blockId) {
     assert(Thread.holdsLock(this));
-    return volumeMap.get(blockId);
+    return volumeMap.get(bpid, blockId);
   }
 
   @Override // FSDatasetInterface
   public synchronized ReplicaRecoveryInfo initReplicaRecovery(
       RecoveringBlock rBlock) throws IOException {
-    return initReplicaRecovery(
+    return initReplicaRecovery(rBlock.getBlock().getPoolId(),
         volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp());
   }
 
   /** static version of {@link #initReplicaRecovery(Block, long)}. */
-  static ReplicaRecoveryInfo initReplicaRecovery(
+  static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
       ReplicasMap map, Block block, long recoveryId) throws IOException {
-    final ReplicaInfo replica = map.get(block.getBlockId());
+    final ReplicaInfo replica = map.get(bpid, block.getBlockId());
     DataNode.LOG.info("initReplicaRecovery: block=" + block
         + ", recoveryId=" + recoveryId
         + ", replica=" + replica);
@@ -2044,7 +2258,7 @@ public class FSDataset implements FSCons
     }
     else {
       rur = new ReplicaUnderRecovery(replica, recoveryId);
-      map.add(rur);
+      map.add(bpid, rur);
       DataNode.LOG.info("initReplicaRecovery: changing replica state for "
           + block + " from " + replica.getState()
           + " to " + rur.getState());
@@ -2058,7 +2272,8 @@ public class FSDataset implements FSCons
                                     final long recoveryId,
                                     final long newlength) throws IOException {
     //get replica
-    final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockId());
+    final ReplicaInfo replica = volumeMap.get(oldBlock.getPoolId(), 
+        oldBlock.getBlockId());
     DataNode.LOG.info("updateReplica: block=" + oldBlock
         + ", recoveryId=" + recoveryId
         + ", length=" + newlength
@@ -2086,8 +2301,8 @@ public class FSDataset implements FSCons
     checkReplicaFiles(replica);
 
     //update replica
-    final FinalizedReplica finalized = updateReplicaUnderRecovery(
-        (ReplicaUnderRecovery)replica, recoveryId, newlength);
+    final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
+        .getPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength);
 
     //check replica files after update
     checkReplicaFiles(finalized);
@@ -2095,6 +2310,7 @@ public class FSDataset implements FSCons
   }
 
   private FinalizedReplica updateReplicaUnderRecovery(
+                                          String bpid,
                                           ReplicaUnderRecovery rur,
                                           long recoveryId,
                                           long newlength) throws IOException {
@@ -2121,13 +2337,13 @@ public class FSDataset implements FSCons
    }
 
     // finalize the block
-    return finalizeReplica(rur);
+    return finalizeReplica(bpid, rur);
   }
 
   @Override // FSDatasetInterface
   public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    final Replica replica = volumeMap.get(block.getBlockId());
+    final Replica replica = volumeMap.get(block.getPoolId(), block.getBlockId());
     if (replica == null) {
       throw new ReplicaNotFoundException(block);
     }
@@ -2138,6 +2354,14 @@ public class FSDataset implements FSCons
     }
     return replica.getVisibleLength();
   }
+  
+  public synchronized void addBlockPool(String bpid, Configuration conf)
+      throws IOException {
+    DataNode.LOG.info("Adding block pool " + bpid);
+    volumes.addBlockPool(bpid, conf);
+    volumes.getVolumeMap(bpid, volumeMap);
+  }
+  
   /**
    * Class for representing the Datanode volume information
    */

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Fri Feb 25 22:42:39 2011
@@ -147,12 +147,12 @@ class FSDatasetAsyncDiskService {
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.
    */
-  void deleteAsync(FSDataset.FSVolume volume, File blockFile,
+  void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile,
       File metaFile, long dfsBytes, String blockName) {
     DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
         + " for deletion");
     ReplicaFileDeleteTask deletionTask = 
-        new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes,
+        new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile, dfsBytes,
             blockName);
     execute(volume.getCurrentDir(), deletionTask);
   }
@@ -161,16 +161,17 @@ class FSDatasetAsyncDiskService {
    *  as decrement the dfs usage of the volume. 
    */
   static class ReplicaFileDeleteTask implements Runnable {
-
-    FSDataset.FSVolume volume;
-    File blockFile;
-    File metaFile;
-    long dfsBytes;
-    String blockName;
+    final FSDataset.FSVolume volume;
+    final String blockPoolId;
+    final File blockFile;
+    final File metaFile;
+    final long dfsBytes;
+    final String blockName;
     
-    ReplicaFileDeleteTask(FSDataset.FSVolume volume, File blockFile,
-        File metaFile, long dfsBytes, String blockName) {
+    ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid,
+        File blockFile, File metaFile, long dfsBytes, String blockName) {
       this.volume = volume;
+      this.blockPoolId = bpid;
       this.blockFile = blockFile;
       this.metaFile = metaFile;
       this.dfsBytes = dfsBytes;
@@ -184,18 +185,21 @@ class FSDatasetAsyncDiskService {
     @Override
     public String toString() {
       // Called in AsyncDiskService.execute for displaying error messages.
-      return "deletion of block " + blockName + " with block file " + blockFile
-          + " and meta file " + metaFile + " from volume " + volume;
+      return "deletion of block " + blockPoolId + " " + blockName
+          + " with block file " + blockFile + " and meta file " + metaFile
+          + " from volume " + volume;
     }
 
     @Override
     public void run() {
       if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
         DataNode.LOG.warn("Unexpected error trying to delete block "
-            + blockName + " at file " + blockFile + ". Ignored.");
+            + blockPoolId + " " + blockName + " at file " + blockFile
+            + ". Ignored.");
       } else {
-        volume.decDfsUsed(dfsBytes);
-        DataNode.LOG.info("Deleted block " + blockName + " at file " + blockFile);
+        volume.decDfsUsed(blockPoolId, dfsBytes);
+        DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName
+            + " at file " + blockFile);
       }
     }
   };

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Fri Feb 25 22:42:39 2011
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -103,12 +104,12 @@ public interface FSDatasetInterface exte
    * @return replica from the replicas map
    */
   @Deprecated
-  public Replica getReplica(long blockId);
+  public Replica getReplica(String bpid, long blockId);
 
   /**
    * @return the generation stamp stored with the block.
    */
-  public Block getStoredBlock(String poolId, long blkid)
+  public Block getStoredBlock(String bpid, long blkid)
       throws IOException;
 
   /**
@@ -271,10 +272,12 @@ public interface FSDatasetInterface exte
   public void unfinalizeBlock(ExtendedBlock b) throws IOException;
 
   /**
-   * Returns the block report - the full list of blocks stored
+   * Returns the block report - the full list of blocks stored under a 
+   * block pool
+   * @param bpid Block Pool Id
    * @return - the block report - the full list of blocks stored
    */
-  public BlockListAsLongs getBlockReport();
+  public BlockListAsLongs getBlockReport(String bpid);
 
   /**
    * Is the block valid?
@@ -285,10 +288,11 @@ public interface FSDatasetInterface exte
 
   /**
    * Invalidates the specified blocks
+   * @param bpid Block pool Id
    * @param invalidBlks - the blocks to be invalidated
    * @throws IOException
    */
-  public void invalidate(String poolId, Block invalidBlks[]) throws IOException;
+  public void invalidate(String bpid, Block invalidBlks[]) throws IOException;
 
     /**
      * Check if all the data directories are healthy
@@ -330,12 +334,11 @@ public interface FSDatasetInterface exte
 
   /**
    * Initialize a replica recovery.
-   * 
    * @return actual state of the replica on this data-node or 
    * null if data-node does not have the replica.
    */
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
-  throws IOException;
+      throws IOException;
 
   /**
    * Update replica's generation stamp and length and finalize it.
@@ -344,4 +347,10 @@ public interface FSDatasetInterface exte
                                           ExtendedBlock oldBlock,
                                           long recoveryId,
                                           long newLength) throws IOException;
+  /**
+   * add new block pool ID
+   * @param bpid Block pool Id
+   * @param conf Configuration
+   */
+  public void addBlockPool(String bpid, Configuration conf) throws IOException;
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java Fri Feb 25 22:42:39 2011
@@ -19,24 +19,39 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 
 class ReplicasMap {
-  // HashMap: maps a block id to the replica's meta info
-  private HashMap<Long, ReplicaInfo> map = new HashMap<Long, ReplicaInfo>();
+  // Map of block pool Id to another map of block Id to ReplicaInfo.
+  private Map<String, Map<Long, ReplicaInfo>> map = 
+    new HashMap<String, Map<Long, ReplicaInfo>>();
+  
+  private void checkBlockPool(String bpid) {
+    if (bpid == null) {
+      throw new IllegalArgumentException("Block Pool Id is null");
+    }
+  }
+  
+  private void checkBlock(Block b) {
+    if (b == null) {
+      throw new IllegalArgumentException("Block is null");
+    }
+  }
+  
   /**
    * Get the meta information of the replica that matches both block id 
    * and generation stamp
+   * @param bpid block pool id
    * @param block block with its id as the key
    * @return the replica's meta information
-   * @throws IllegalArgumentException if the input block is null
+   * @throws IllegalArgumentException if the input block or block pool is null
    */
-  ReplicaInfo get(Block block) {
-    if (block == null) {
-      throw new IllegalArgumentException("Do not expect null block");
-    }
-    ReplicaInfo replicaInfo = get(block.getBlockId());
+  ReplicaInfo get(String bpid, Block block) {
+    checkBlockPool(bpid);
+    checkBlock(block);
+    ReplicaInfo replicaInfo = get(bpid, block.getBlockId());
     if (replicaInfo != null && 
         block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
       return replicaInfo;
@@ -44,72 +59,95 @@ class ReplicasMap {
     return null;
   }
   
+  
   /**
    * Get the meta information of the replica that matches the block id
+   * @param bpid block pool id
    * @param blockId a block's id
    * @return the replica's meta information
    */
-  ReplicaInfo get(long blockId) {
-    return map.get(blockId);
+  ReplicaInfo get(String bpid, long blockId) {
+    checkBlockPool(bpid);
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    return m != null ? m.get(blockId) : null;
   }
   
   /**
    * Add a replica's meta information into the map 
    * 
+   * @param bpid block pool id
    * @param replicaInfo a replica's meta information
    * @return previous meta information of the replica
    * @throws IllegalArgumentException if the input parameter is null
    */
-  ReplicaInfo add(ReplicaInfo replicaInfo) {
-    if (replicaInfo == null) {
-      throw new IllegalArgumentException("Do not expect null block");
+  ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
+    checkBlockPool(bpid);
+    checkBlock(replicaInfo);
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    if (m == null) {
+      // Add an entry for block pool if it does not exist already
+      m = new HashMap<Long, ReplicaInfo>();
+      map.put(bpid, m);
     }
-    return  map.put(replicaInfo.getBlockId(), replicaInfo);
+    return  m.put(replicaInfo.getBlockId(), replicaInfo);
   }
   
   /**
    * Remove the replica's meta information from the map that matches
    * the input block's id and generation stamp
+   * @param bpid block pool id
    * @param block block with its id as the key
    * @return the removed replica's meta information
    * @throws IllegalArgumentException if the input block is null
    */
-  ReplicaInfo remove(Block block) {
-    if (block == null) {
-      throw new IllegalArgumentException("Do not expect null block");
-    }
-    Long key = Long.valueOf(block.getBlockId());
-    ReplicaInfo replicaInfo = map.get(key);
-    if (replicaInfo != null &&
-        block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
-      return remove(key);
-    } 
+  ReplicaInfo remove(String bpid, Block block) {
+    checkBlockPool(bpid);
+    checkBlock(block);
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    if (m != null) {
+      Long key = Long.valueOf(block.getBlockId());
+      ReplicaInfo replicaInfo = m.get(key);
+      if (replicaInfo != null &&
+          block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+        return m.remove(key);
+      } 
+    }
     
     return null;
   }
   
   /**
    * Remove the replica's meta information from the map if present
+   * @param bpid block pool id
    * @param the block id of the replica to be removed
    * @return the removed replica's meta information
    */
-  ReplicaInfo remove(long blockId) {
-    return map.remove(blockId);
+  ReplicaInfo remove(String bpid, long blockId) {
+    checkBlockPool(bpid);
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    if (m != null) {
+      return m.remove(blockId);
+    }
+    return null;
   }
  
   /**
-   * Get the size of the map
+   * Get the size of the map for given block pool
+   * @param bpid block pool id
    * @return the number of replicas in the map
    */
-  int size() {
-    return map.size();
+  int size(String bpid) {
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    return m != null ? m.size() : 0;
   }
   
   /**
-   * Get a collection of the replicas
+   * Get a collection of the replicas for given block pool
+   * @param bpid block pool id
    * @return a collection of the replicas
    */
-  Collection<ReplicaInfo> replicas() {
-    return map.values();
+  Collection<ReplicaInfo> replicas(String bpid) {
+    Map<Long, ReplicaInfo> m = map.get(bpid);
+    return m != null ? m.values() : null;
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Feb 25 22:42:39 2011
@@ -354,16 +354,11 @@ public class DatanodeDescriptor extends 
     return replicateBlocks.poll(maxTransfers);
   }
 
-  BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+  BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
     List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
       return null;
-    BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
-    for(BlockInfoUnderConstruction b : blocks) {
-      brCommand.add(new RecoveringBlock(
-          new ExtendedBlock(b), b.getExpectedLocations(), b.getBlockRecoveryId()));
-    }
-    return brCommand;
+    return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]);
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Feb 25 22:42:39 2011
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -2731,9 +2733,17 @@ public class FSNamesystem implements FSC
         updateStats(nodeinfo, true);
         
         //check lease recovery
-        cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
-        if (cmd != null) {
-          return new DatanodeCommand[] {cmd};
+        BlockInfoUnderConstruction[] blocks = nodeinfo
+            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        if (blocks != null) {
+          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
+              blocks.length);
+          for (BlockInfoUnderConstruction b : blocks) {
+            brCommand.add(new RecoveringBlock(
+                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
+                    .getBlockRecoveryId()));
+          }
+          return new DatanodeCommand[] { brCommand };
         }
       
         ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java Fri Feb 25 22:42:39 2011
@@ -161,7 +161,7 @@ public class TestFiHftp {
     final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
     LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
     final FSDataset data = (FSDataset)dn.getFSDataset();
-    final File blkfile = data.getBlockFile(blk.getLocalBlock());
+    final File blkfile = data.getBlockFile(blk);
     Assert.assertTrue(blkfile.delete());
 
     //read again by hftp, should get an exception 

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Feb 25 22:42:39 2011
@@ -827,6 +827,7 @@ public class MiniDFSCluster {
 
   /*
    * Corrupt a block on a particular datanode
+   * Types: delete, write bad data, truncate
    */
   boolean corruptBlockOnDataNode(int i, ExtendedBlock blk) throws Exception {
     Random random = new Random();
@@ -1094,11 +1095,12 @@ public class MiniDFSCluster {
    * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
    * @return the block report for the specified data node
    */
-  public Iterable<Block> getBlockReport(int dataNodeIndex) {
+  public Iterable<Block> getBlockReport(String bpid, int dataNodeIndex) {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
-    return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport();
+    return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(
+        bpid);
   }
   
   
@@ -1107,11 +1109,11 @@ public class MiniDFSCluster {
    * @return block reports from all data nodes
    *    BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
    */
-  public Iterable<Block>[] getAllBlockReports() {
+  public Iterable<Block>[] getAllBlockReports(String bpid) {
     int numDataNodes = dataNodes.size();
     Iterable<Block>[] result = new BlockListAsLongs[numDataNodes];
     for (int i = 0; i < numDataNodes; ++i) {
-     result[i] = getBlockReport(i);
+     result[i] = getBlockReport(bpid, i);
     }
     return result;
   }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Fri Feb 25 22:42:39 2011
@@ -45,7 +45,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.shell.Count;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset;
 import org.apache.hadoop.io.IOUtils;
@@ -1108,14 +1107,12 @@ public class TestDFSShell extends TestCa
   static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
     List<File> files = new ArrayList<File>();
     List<DataNode> datanodes = cluster.getDataNodes();
-    Iterable<Block>[] blocks = cluster.getAllBlockReports();
-    ExtendedBlock blk = new ExtendedBlock();
     String poolId = cluster.getNamesystem().getPoolId();
+    Iterable<Block>[] blocks = cluster.getAllBlockReports(poolId);
     for(int i = 0; i < blocks.length; i++) {
       FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
       for(Block b : blocks[i]) {
-        blk.set(poolId, b);
-        files.add(ds.getBlockFile(blk.getLocalBlock()));
+        files.add(ds.getBlockFile(poolId, b));
       }        
     }
     return files;

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Feb 25 22:42:39 2011
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -420,7 +419,8 @@ public class TestDataTransferProtocol ex
     
     /* Test OP_READ_BLOCK */
 
-    ExtendedBlock blk = new ExtendedBlock(firstBlock.getLocalBlock());
+    String bpid = cluster.getNamesystem().getPoolId();
+    ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
     long blkid = blk.getBlockId();
     // bad block id
     sendBuf.reset();

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java Fri Feb 25 22:42:39 2011
@@ -135,7 +135,7 @@ public class TestFileAppend extends Test
       //
       for (int i = 0; i < blocks.size(); i = i + 2) {
         ExtendedBlock b = blocks.get(i).getBlock();
-        File f = dataset.getFile(b.getLocalBlock());
+        File f = dataset.getFile(b.getPoolId(), b.getLocalBlock());
         File link = new File(f.toString() + ".link");
         System.out.println("Creating hardlink for File " + f + " to " + link);
         HardLink.createHardLink(f, link);
@@ -148,7 +148,7 @@ public class TestFileAppend extends Test
         ExtendedBlock b = blocks.get(i).getBlock();
         System.out.println("testCopyOnWrite detaching block " + b);
         assertTrue("Detaching block " + b + " should have returned true",
-            dataset.unlinkBlock(b.getLocalBlock(), 1));
+            dataset.unlinkBlock(b, 1));
       }
 
       // Since the blocks were already detached earlier, these calls should
@@ -158,7 +158,7 @@ public class TestFileAppend extends Test
         ExtendedBlock b = blocks.get(i).getBlock();
         System.out.println("testCopyOnWrite detaching block " + b);
         assertTrue("Detaching block " + b + " should have returned false",
-            !dataset.unlinkBlock(b.getLocalBlock(), 1));
+            !dataset.unlinkBlock(b, 1));
       }
 
     } finally {

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Fri Feb 25 22:42:39 2011
@@ -199,7 +199,7 @@ public class TestFileAppend3 extends jun
     assertEquals(repl, datanodeinfos.length);
     final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
     final FSDataset data = (FSDataset)dn.getFSDataset();
-    final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk.getLocalBlock()), "rw");
+    final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk), "rw");
     AppendTestUtil.LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
     assertEquals(len1, raf.length());
     raf.setLength(0);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1074727&r1=1074726&r2=1074727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Fri Feb 25 22:42:39 2011
@@ -27,12 +27,9 @@ import java.util.EnumSet;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -768,7 +765,7 @@ public class TestFileCreation extends ju
         FSDataset dataset = (FSDataset)datanode.data;
         ExtendedBlock blk = locatedblock.getBlock();
         Block b = dataset.getStoredBlock(blk.getPoolId(), blk.getBlockId());
-        File blockfile = dataset.findBlockFile(b.getBlockId());
+        File blockfile = dataset.findBlockFile(blk.getPoolId(), b.getBlockId());
         System.out.println("blockfile=" + blockfile);
         if (blockfile != null) {
           BufferedReader in = new BufferedReader(new FileReader(blockfile));



Mime
View raw message