hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1099687 [7/15] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/pro...
Date Thu, 05 May 2011 05:40:13 GMT
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu May  5 05:40:07 2011
@@ -31,9 +31,14 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.Map.Entry;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -47,9 +52,8 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
@@ -74,7 +78,6 @@ import org.apache.hadoop.io.IOUtils;
 @InterfaceAudience.Private
 public class FSDataset implements FSConstants, FSDatasetInterface {
 
-
   /**
    * A node type that can be built into a tree reflecting the
    * hierarchy of blocks on the local disk.
@@ -166,16 +169,16 @@ public class FSDataset implements FSCons
       return children[ lastChildIdx ].addBlock(b, src, true, false); 
     }
 
-    void getVolumeMap(ReplicasMap volumeMap, FSVolume volume) 
+    void getVolumeMap(String bpid, ReplicasMap volumeMap, FSVolume volume) 
     throws IOException {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
-          children[i].getVolumeMap(volumeMap, volume);
+          children[i].getVolumeMap(bpid, volumeMap, volume);
         }
       }
 
       recoverTempUnlinkedBlock();
-      volume.addToReplicasMap(volumeMap, dir, true);
+      volume.addToReplicasMap(bpid, volumeMap, dir, true);
     }
         
     /**
@@ -284,21 +287,35 @@ public class FSDataset implements FSCons
     }
   }
 
-  class FSVolume {
-    private File currentDir;
-    private FSDir dataDir;      // directory store Finalized replica
-    private File rbwDir;        // directory store RBW replica
-    private File tmpDir;        // directory store Temporary replica
-    private DF usage;
-    private DU dfsUsage;
-    private long reserved;
-
+  /**
+   * A BlockPoolSlice represents a portion of a BlockPool stored on a volume.  
+   * Taken together, all BlockPoolSlices sharing a block pool ID across a 
+   * cluster represent a single block pool.
+   */
+  class BlockPoolSlice {
+    private final String bpid;
+    private final FSVolume volume; // volume to which this BlockPool belongs to
+    private final File currentDir; // StorageDirectory/current/bpid/current
+    private final FSDir finalizedDir; // directory store Finalized replica
+    private final File rbwDir; // directory store RBW replica
+    private final File tmpDir; // directory store Temporary replica
     
-    FSVolume(File currentDir, Configuration conf) throws IOException {
-      this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
-                                   DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
-      this.currentDir = currentDir; 
-      File parent = currentDir.getParentFile();
+    // TODO:FEDERATION scalability issue - a thread per DU is needed
+    private final DU dfsUsage;
+
+    /**
+     * 
+     * @param bpid Block pool Id
+     * @param volume {@link FSVolume} to which this BlockPool belongs to
+     * @param bpDir directory corresponding to the BlockPool
+     * @param conf
+     * @throws IOException
+     */
+    BlockPoolSlice(String bpid, FSVolume volume, File bpDir, Configuration conf)
+        throws IOException {
+      this.bpid = bpid;
+      this.volume = volume;
+      this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
       final File finalizedDir = new File(
           currentDir, DataStorage.STORAGE_DIR_FINALIZED);
 
@@ -307,7 +324,7 @@ public class FSDataset implements FSCons
       // in the future, we might want to do some sort of datanode-local
       // recovery for these blocks. For example, crc validation.
       //
-      this.tmpDir = new File(parent, "tmp");
+      this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
       if (tmpDir.exists()) {
         FileUtil.fullyDelete(tmpDir);
       }
@@ -315,7 +332,7 @@ public class FSDataset implements FSCons
       if (rbwDir.exists() && !supportAppends) {
         FileUtil.fullyDelete(rbwDir);
       }
-      this.dataDir = new FSDir(finalizedDir);
+      this.finalizedDir = new FSDir(finalizedDir);
       if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
         if (!rbwDir.isDirectory()) {
           throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@@ -326,15 +343,26 @@ public class FSDataset implements FSCons
           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
         }
       }
-      this.usage = new DF(parent, conf);
-      this.dfsUsage = new DU(parent, conf);
+      this.dfsUsage = new DU(bpDir, conf);
       this.dfsUsage.start();
     }
 
+    File getDirectory() {
+      return currentDir.getParentFile();
+    }
+    
     File getCurrentDir() {
       return currentDir;
     }
     
+    File getFinalizedDir() {
+      return finalizedDir.dir;
+    }
+    
+    File getRbwDir() {
+      return rbwDir;
+    }
+    
     void decDfsUsed(long value) {
       // The caller to this method (BlockFileDeleteTask.run()) does
       // not have locked FSDataset.this yet.
@@ -348,37 +376,6 @@ public class FSDataset implements FSCons
     }
     
     /**
-     * Calculate the capacity of the filesystem, after removing any
-     * reserved capacity.
-     * @return the unreserved number of bytes left in this filesystem. May be zero.
-     */
-    long getCapacity() throws IOException {
-      long remaining = usage.getCapacity() - reserved;
-      return remaining > 0 ? remaining : 0;
-    }
-      
-    long getAvailable() throws IOException {
-      long remaining = getCapacity()-getDfsUsed();
-      long available = usage.getAvailable();
-      if (remaining>available) {
-        remaining = available;
-      }
-      return (remaining > 0) ? remaining : 0;
-    }
-      
-    long getReserved(){
-      return reserved;
-    }
-    
-    String getMount() throws IOException {
-      return usage.getMount();
-    }
-      
-    File getDir() {
-      return dataDir.dir;
-    }
-    
-    /**
      * Temporary files. They get moved to the finalized block directory when
      * the block is finalized.
      */
@@ -397,21 +394,21 @@ public class FSDataset implements FSCons
     }
 
     File addBlock(Block b, File f) throws IOException {
-      File blockFile = dataDir.addBlock(b, f);
-      File metaFile = getMetaFile( blockFile , b);
+      File blockFile = finalizedDir.addBlock(b, f);
+      File metaFile = getMetaFile(blockFile , b.getGenerationStamp());
       dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
       return blockFile;
     }
       
     void checkDirs() throws DiskErrorException {
-      dataDir.checkDirTree();
+      finalizedDir.checkDirTree();
       DiskChecker.checkDir(tmpDir);
       DiskChecker.checkDir(rbwDir);
     }
       
     void getVolumeMap(ReplicasMap volumeMap) throws IOException {
       // add finalized replicas
-      dataDir.getVolumeMap(volumeMap, this);
+      finalizedDir.getVolumeMap(bpid, volumeMap, volume);
       // add rbw replicas
       addToReplicasMap(volumeMap, rbwDir, false);
     }
@@ -435,14 +432,14 @@ public class FSDataset implements FSCons
         ReplicaInfo newReplica = null;
         if (isFinalized) {
           newReplica = new FinalizedReplica(blockId, 
-              blockFile.length(), genStamp, this, blockFile.getParentFile());
+              blockFile.length(), genStamp, volume, blockFile.getParentFile());
         } else {
           newReplica = new ReplicaWaitingToBeRecovered(blockId,
               validateIntegrity(blockFile, genStamp), 
-              genStamp, this, blockFile.getParentFile());
+              genStamp, volume, blockFile.getParentFile());
         }
 
-        ReplicaInfo oldReplica = volumeMap.add(newReplica);
+        ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
         if (oldReplica != null) {
           DataNode.LOG.warn("Two block files with the same block id exist " +
               "on disk: " + oldReplica.getBlockFile() +
@@ -520,71 +517,361 @@ public class FSDataset implements FSCons
     }
       
     void clearPath(File f) {
-      dataDir.clearPath(f);
+      finalizedDir.clearPath(f);
     }
       
     public String toString() {
-      return getDir().getAbsolutePath();
+      return currentDir.getAbsolutePath();
+    }
+    
+    public void shutdown() {
+      dfsUsage.shutdown();
+    }
+  }
+  
+  class FSVolume {
+    private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
+    private final File currentDir;    // <StorageDirectory>/current
+    private final DF usage;           
+    private final long reserved;
+    
+    FSVolume(File currentDir, Configuration conf) throws IOException {
+      this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
+                                   DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
+      this.currentDir = currentDir; 
+      File parent = currentDir.getParentFile();
+      this.usage = new DF(parent, conf);
+    }
+
+    /** Return storage directory corresponding to the volume */
+    public File getDir() {
+      return currentDir.getParentFile();
+    }
+    
+    public File getCurrentDir() {
+      return currentDir;
+    }
+    
+    public File getRbwDir(String bpid) throws IOException {
+      BlockPoolSlice bp = getBlockPoolSlice(bpid);
+      return bp.getRbwDir();
+    }
+    
+    void decDfsUsed(String bpid, long value) {
+      // The caller to this method (BlockFileDeleteTask.run()) does
+      // not have locked FSDataset.this yet.
+      synchronized(FSDataset.this) {
+        BlockPoolSlice bp = map.get(bpid);
+        if (bp != null) {
+          bp.decDfsUsed(value);
+        }
+      }
+    }
+    
+    long getDfsUsed() throws IOException {
+      // TODO valid synchronization
+      long dfsUsed = 0;
+      Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
+      for (Entry<String, BlockPoolSlice> entry : set) {
+        dfsUsed += entry.getValue().getDfsUsed();
+      }
+      return dfsUsed;
+    }
+    
+    long getBlockPoolUsed(String bpid) throws IOException {
+      return getBlockPoolSlice(bpid).getDfsUsed();
+    }
+    
+    /**
+     * Calculate the capacity of the filesystem, after removing any
+     * reserved capacity.
+     * @return the unreserved number of bytes left in this filesystem. May be zero.
+     */
+    long getCapacity() throws IOException {
+      long remaining = usage.getCapacity() - reserved;
+      return remaining > 0 ? remaining : 0;
+    }
+      
+    long getAvailable() throws IOException {
+      long remaining = getCapacity()-getDfsUsed();
+      long available = usage.getAvailable();
+      if (remaining>available) {
+        remaining = available;
+      }
+      return (remaining > 0) ? remaining : 0;
+    }
+      
+    long getReserved(){
+      return reserved;
+    }
+    
+    String getMount() throws IOException {
+      return usage.getMount();
+    }
+    
+    BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
+      BlockPoolSlice bp = map.get(bpid);
+      if (bp == null) {
+        throw new IOException("block pool " + bpid + " is not found");
+      }
+      return bp;
+    }
+    
+    /**
+     * Make a deep copy of the list of currently active BPIDs
+     */
+    String[] getBlockPoolList() {
+      synchronized(FSDataset.this) {
+        return map.keySet().toArray(new String[map.keySet().size()]);   
+      }
+    }
+      
+    /**
+     * Temporary files. They get moved to the finalized block directory when
+     * the block is finalized.
+     */
+    File createTmpFile(String bpid, Block b) throws IOException {
+      BlockPoolSlice bp = getBlockPoolSlice(bpid);
+      return bp.createTmpFile(b);
+    }
+
+    /**
+     * RBW files. They get moved to the finalized block directory when
+     * the block is finalized.
+     */
+    File createRbwFile(String bpid, Block b) throws IOException {
+      BlockPoolSlice bp = getBlockPoolSlice(bpid);
+      return bp.createRbwFile(b);
+    }
+
+    File addBlock(String bpid, Block b, File f) throws IOException {
+      BlockPoolSlice bp = getBlockPoolSlice(bpid);
+      return bp.addBlock(b, f);
+    }
+      
+    void checkDirs() throws DiskErrorException {
+      // TODO:FEDERATION valid synchronization
+      Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
+      for (Entry<String, BlockPoolSlice> entry : set) {
+        entry.getValue().checkDirs();
+      }
+    }
+      
+    void getVolumeMap(ReplicasMap volumeMap) throws IOException {
+      Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
+      for (Entry<String, BlockPoolSlice> entry : set) {
+        entry.getValue().getVolumeMap(volumeMap);
+      }
+    }
+    
+    void getVolumeMap(String bpid, ReplicasMap volumeMap) throws IOException {
+      BlockPoolSlice bp = getBlockPoolSlice(bpid);
+      bp.getVolumeMap(volumeMap);
+    }
+    
+    /**
+     * Add replicas under the given directory to the volume map
+     * @param volumeMap the replicas map
+     * @param dir an input directory
+     * @param isFinalized true if the directory has finalized replicas;
+     *                    false if the directory has rbw replicas
+     * @throws IOException 
+     */
+    private void addToReplicasMap(String bpid, ReplicasMap volumeMap, 
+        File dir, boolean isFinalized) throws IOException {
+      BlockPoolSlice bp = getBlockPoolSlice(bpid);
+      // TODO move this up
+      // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
+      bp.addToReplicasMap(volumeMap, dir, isFinalized);
+    }
+    
+    void clearPath(String bpid, File f) throws IOException {
+      BlockPoolSlice bp = getBlockPoolSlice(bpid);
+      bp.clearPath(f);
+    }
+      
+    public String toString() {
+      return currentDir.getAbsolutePath();
+    }
+
+    public void shutdown() {
+      Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
+      for (Entry<String, BlockPoolSlice> entry : set) {
+        entry.getValue().shutdown();
+      }
+    }
+
+    public void addBlockPool(String bpid, Configuration conf)
+        throws IOException {
+      File bpdir = new File(currentDir, bpid);
+      BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
+      map.put(bpid, bp);
+    }
+    
+    public void shutdownBlockPool(String bpid) {
+      BlockPoolSlice bp = map.get(bpid);
+      if (bp!=null) {
+        bp.shutdown();
+      }
+      map.remove(bpid);
+    }
+
+    private boolean isBPDirEmpty(String bpid)
+        throws IOException {
+      File volumeCurrentDir = this.getCurrentDir();
+      File bpDir = new File(volumeCurrentDir, bpid);
+      File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
+      File finalizedDir = new File(bpCurrentDir,
+          DataStorage.STORAGE_DIR_FINALIZED);
+      File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
+      if (finalizedDir.exists() && finalizedDir.list().length != 0) {
+        return false;
+      }
+      if (rbwDir.exists() && rbwDir.list().length != 0) {
+        return false;
+      }
+      return true;
+    }
+    
+    private void deleteBPDirectories(String bpid, boolean force)
+        throws IOException {
+      File volumeCurrentDir = this.getCurrentDir();
+      File bpDir = new File(volumeCurrentDir, bpid);
+      if (!bpDir.isDirectory()) {
+        // nothing to be deleted
+        return;
+      }
+      File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
+      File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
+      File finalizedDir = new File(bpCurrentDir,
+          DataStorage.STORAGE_DIR_FINALIZED);
+      File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
+      if (force) {
+        FileUtil.fullyDelete(bpDir);
+      } else {
+        if (!rbwDir.delete()) {
+          throw new IOException("Failed to delete " + rbwDir);
+        }
+        if (!finalizedDir.delete()) {
+          throw new IOException("Failed to delete " + finalizedDir);
+        }
+        FileUtil.fullyDelete(tmpDir);
+        for (File f : bpCurrentDir.listFiles()) {
+          if (!f.delete()) {
+            throw new IOException("Failed to delete " + f);
+          }
+        }
+        if (!bpCurrentDir.delete()) {
+          throw new IOException("Failed to delete " + bpCurrentDir);
+        }
+        for (File f : bpDir.listFiles()) {
+          if (!f.delete()) {
+            throw new IOException("Failed to delete " + f);
+          }
+        }
+        if (!bpDir.delete()) {
+          throw new IOException("Failed to delete " + bpDir);
+        }
+      }
     }
   }
     
   static class FSVolumeSet {
-    FSVolume[] volumes = null;
+    /*
+     * Read access to this unmodifiable list is not synchronized.
+     * This list is replaced on modification holding "this" lock.
+     */
+    private volatile List<FSVolume> volumes = null;
     BlockVolumeChoosingPolicy blockChooser;
-      
+    int numFailedVolumes = 0;
+
     FSVolumeSet(FSVolume[] volumes, BlockVolumeChoosingPolicy blockChooser) {
-      this.volumes = volumes;
+      List<FSVolume> list = Arrays.asList(volumes);
+      this.volumes = Collections.unmodifiableList(list);
       this.blockChooser = blockChooser;
     }
     
     private int numberOfVolumes() {
-      return volumes.length;
+      return volumes.size();
     }
-      
+
+    private int numberOfFailedVolumes() {
+      return numFailedVolumes;
+    }
+    
+    /** 
+     * Get next volume. Synchronized to ensure {@link #curVolume} is updated
+     * by a single thread and next volume is chosen with no concurrent
+     * update to {@link #volumes}.
+     * @param blockSize free space needed on the volume
+     * @return next volume to store the block in.
+     */
     synchronized FSVolume getNextVolume(long blockSize) throws IOException {
       return blockChooser.chooseVolume(volumes, blockSize);
     }
       
-    long getDfsUsed() throws IOException {
+    private long getDfsUsed() throws IOException {
       long dfsUsed = 0L;
-      for (int idx = 0; idx < volumes.length; idx++) {
-        dfsUsed += volumes[idx].getDfsUsed();
+      for (FSVolume vol : volumes) {
+        dfsUsed += vol.getDfsUsed();
       }
       return dfsUsed;
     }
 
-    long getCapacity() throws IOException {
+    private long getBlockPoolUsed(String bpid) throws IOException {
+      long dfsUsed = 0L;
+      for (FSVolume vol : volumes) {
+        dfsUsed += vol.getBlockPoolUsed(bpid);
+      }
+      return dfsUsed;
+    }
+
+    private long getCapacity() throws IOException {
       long capacity = 0L;
-      for (int idx = 0; idx < volumes.length; idx++) {
-        capacity += volumes[idx].getCapacity();
+      for (FSVolume vol : volumes) {
+        capacity += vol.getCapacity();
       }
       return capacity;
     }
       
-    long getRemaining() throws IOException {
+    private long getRemaining() throws IOException {
       long remaining = 0L;
-      for (int idx = 0; idx < volumes.length; idx++) {
-        remaining += volumes[idx].getAvailable();
+      for (FSVolume vol : volumes) {
+        remaining += vol.getAvailable();
       }
       return remaining;
     }
       
-    synchronized void getVolumeMap(ReplicasMap volumeMap) throws IOException {
-      for (int idx = 0; idx < volumes.length; idx++) {
-        volumes[idx].getVolumeMap(volumeMap);
+    private void getVolumeMap(ReplicasMap volumeMap)
+        throws IOException {
+      for (FSVolume vol : volumes) {
+        vol.getVolumeMap(volumeMap);
+      }
+    }
+    
+    private void getVolumeMap(String bpid, ReplicasMap volumeMap)
+        throws IOException {
+      for (FSVolume vol : volumes) {
+        vol.getVolumeMap(bpid, volumeMap);
       }
     }
       
     /**
      * Calls {@link FSVolume#checkDirs()} on each volume, removing any
      * volumes from the active list that result in a DiskErrorException.
+     * 
+     * This method is synchronized to allow only one instance of checkDirs() 
+     * call
      * @return list of all the removed volumes.
      */
-    synchronized List<FSVolume> checkDirs() {
-      ArrayList<FSVolume> removedVols = null;  
+    private synchronized List<FSVolume> checkDirs() {
+      ArrayList<FSVolume> removedVols = null;
       
-      for (int idx = 0; idx < volumes.length; idx++) {
-        FSVolume fsv = volumes[idx];
+      // Make a copy of volumes for performing modification 
+      List<FSVolume> volumeList = new ArrayList<FSVolume>(getVolumes());
+      
+      for (int idx = 0; idx < volumeList.size(); idx++) {
+        FSVolume fsv = volumeList.get(idx);
         try {
           fsv.checkDirs();
         } catch (DiskErrorException e) {
@@ -592,21 +879,21 @@ public class FSDataset implements FSCons
           if (removedVols == null) {
             removedVols = new ArrayList<FSVolume>(1);
           }
-          removedVols.add(volumes[idx]);
-          volumes[idx] = null; // Remove the volume
+          removedVols.add(volumeList.get(idx));
+          volumeList.set(idx, null); // Remove the volume
+          numFailedVolumes++;
         }
       }
       
       // Remove null volumes from the volumes array
       if (removedVols != null && removedVols.size() > 0) {
-        FSVolume newVols[] = new FSVolume[volumes.length - removedVols.size()];
-        int i = 0;
-        for (FSVolume vol : volumes) {
+        List<FSVolume> newVols = new ArrayList<FSVolume>();
+        for (FSVolume vol : volumeList) {
           if (vol != null) {
-            newVols[i++] = vol;
+            newVols.add(vol);
           }
         }
-        volumes = newVols; // Replace array of volumes
+        volumes = Collections.unmodifiableList(newVols); // Replace volume list
         DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
             + removedVols.size() + " volumes. List of current volumes: "
             + this);
@@ -616,22 +903,45 @@ public class FSDataset implements FSCons
     }
       
     public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (int idx = 0; idx < volumes.length; idx++) {
-        sb.append(volumes[idx].toString());
-        if (idx != volumes.length - 1) { sb.append(","); }
-      }
-      return sb.toString();
+      return volumes.toString();
     }
 
-    public boolean isValid(FSVolume volume) {
-      for (int idx = 0; idx < volumes.length; idx++) {
-        if (volumes[idx] == volume) {
+    boolean isValid(FSVolume volume) {
+      for (FSVolume vol : volumes) {
+        if (vol == volume) {
           return true;
         }
       }
       return false;
     }
+
+    private void addBlockPool(String bpid, Configuration conf)
+        throws IOException {
+      for (FSVolume v : volumes) {
+        v.addBlockPool(bpid, conf);
+      }
+    }
+    
+    private void removeBlockPool(String bpid) {
+      for (FSVolume v : volumes) {
+        v.shutdownBlockPool(bpid);
+      }
+    }
+    
+    /**
+     * @return unmodifiable list of volumes
+     */
+    public List<FSVolume> getVolumes() {
+      return volumes;
+    }
+
+    private void shutdown() {
+      for (FSVolume volume : volumes) {
+        if(volume != null) {
+          volume.shutdown();
+        }
+      }
+    }
   }
   
   //////////////////////////////////////////////////////
@@ -664,12 +974,12 @@ public class FSDataset implements FSCons
     return blockFileName + "_" + genStamp + METADATA_EXTENSION;
   }
   
-  static File getMetaFile(File f , Block b) {
-    return new File(getMetaFileName(f.getAbsolutePath(),
-                                    b.getGenerationStamp())); 
+  static File getMetaFile(File f , long genStamp) {
+    return new File(getMetaFileName(f.getAbsolutePath(), genStamp));
   }
-  protected File getMetaFile(Block b) throws IOException {
-    return getMetaFile(getBlockFile(b), b);
+  
+  protected File getMetaFile(ExtendedBlock b) throws IOException {
+    return getMetaFile(getBlockFile(b), b.getGenerationStamp());
   }
 
   /** Find the metadata file for the specified block file.
@@ -728,13 +1038,14 @@ public class FSDataset implements FSCons
   }
 
   /** Return the block file for the given ID */ 
-  public File findBlockFile(long blockId) {
-    return getFile(blockId);
+  public File findBlockFile(String bpid, long blockId) {
+    return getFile(bpid, blockId);
   }
 
   @Override // FSDatasetInterface
-  public synchronized Block getStoredBlock(long blkid) throws IOException {
-    File blockfile = findBlockFile(blkid);
+  public synchronized Block getStoredBlock(String bpid, long blkid)
+      throws IOException {
+    File blockfile = findBlockFile(bpid, blkid);
     if (blockfile == null) {
       return null;
     }
@@ -749,8 +1060,8 @@ public class FSDataset implements FSCons
    * @param blockId
    * @return
    */
-  synchronized ReplicaInfo fetchReplicaInfo(long blockId) {
-    ReplicaInfo r = volumeMap.get(blockId);
+  ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
+    ReplicaInfo r = volumeMap.get(bpid, blockId);
     if(r == null)
       return null;
     switch(r.getState()) {
@@ -769,20 +1080,20 @@ public class FSDataset implements FSCons
   }
 
   @Override // FSDatasetInterface
-  public boolean metaFileExists(Block b) throws IOException {
+  public boolean metaFileExists(ExtendedBlock b) throws IOException {
     return getMetaFile(b).exists();
   }
   
   @Override // FSDatasetInterface
-  public long getMetaDataLength(Block b) throws IOException {
-    File checksumFile = getMetaFile( b );
+  public long getMetaDataLength(ExtendedBlock b) throws IOException {
+    File checksumFile = getMetaFile(b);
     return checksumFile.length();
   }
 
   @Override // FSDatasetInterface
-  public MetaDataInputStream getMetaDataInputStream(Block b)
+  public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
       throws IOException {
-    File checksumFile = getMetaFile( b );
+    File checksumFile = getMetaFile(b);
     return new MetaDataInputStream(new FileInputStream(checksumFile),
                                                     checksumFile.length());
   }
@@ -807,17 +1118,17 @@ public class FSDataset implements FSCons
     return f;
   }
     
-  FSVolumeSet volumes;
-  private int maxBlocksPerDir = 0;
-  ReplicasMap volumeMap = new ReplicasMap();
-  static  Random random = new Random();
-  FSDatasetAsyncDiskService asyncDiskService;
-  private int validVolsRequired;
+  final FSVolumeSet volumes;
+  private final int maxBlocksPerDir;
+  final ReplicasMap volumeMap;
+  static final Random random = new Random();
+  final FSDatasetAsyncDiskService asyncDiskService;
+  private final int validVolsRequired;
 
   // Used for synchronizing access to usage stats
-  private Object statsLock = new Object();
+  private final Object statsLock = new Object();
 
-  boolean supportAppends = true;
+  final boolean supportAppends;
 
   /**
    * An FSDataset has a directory where it loads its data files.
@@ -842,8 +1153,13 @@ public class FSDataset implements FSCons
     }
     FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
-      volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
+      volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(),
+          conf);
+      DataNode.LOG.info("FSDataset added volume - "
+          + storage.getStorageDir(idx).getCurrentDir());
     }
+    volumeMap = new ReplicasMap(this);
+
     BlockVolumeChoosingPolicy blockChooserImpl =
       (BlockVolumeChoosingPolicy) ReflectionUtils.newInstance(
         conf.getClass(DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
@@ -852,6 +1168,7 @@ public class FSDataset implements FSCons
         conf);
     volumes = new FSVolumeSet(volArray, blockChooserImpl);
     volumes.getVolumeMap(volumeMap);
+
     File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();
@@ -863,6 +1180,7 @@ public class FSDataset implements FSCons
   /**
    * Return the total space used by dfs datanode
    */
+  @Override // FSDatasetMBean
   public long getDfsUsed() throws IOException {
     synchronized(statsLock) {
       return volumes.getDfsUsed();
@@ -870,6 +1188,16 @@ public class FSDataset implements FSCons
   }
 
   /**
+   * Return the total space used by dfs datanode
+   */
+  @Override // FSDatasetMBean
+  public long getBlockPoolUsed(String bpid) throws IOException {
+    synchronized(statsLock) {
+      return volumes.getBlockPoolUsed(bpid);
+    }
+  }
+  
+  /**
    * Return true - if there are still valid volumes on the DataNode. 
    */
   @Override // FSDatasetInterface
@@ -880,6 +1208,7 @@ public class FSDataset implements FSCons
   /**
    * Return total capacity, used and unused
    */
+  @Override // FSDatasetMBean
   public long getCapacity() throws IOException {
     synchronized(statsLock) {
       return volumes.getCapacity();
@@ -889,6 +1218,7 @@ public class FSDataset implements FSCons
   /**
    * Return how many bytes can still be stored in the FSDataset
    */
+  @Override // FSDatasetMBean
   public long getRemaining() throws IOException {
     synchronized(statsLock) {
       return volumes.getRemaining();
@@ -896,18 +1226,33 @@ public class FSDataset implements FSCons
   }
 
   /**
+   * Return the number of failed volumes in the FSDataset.
+   */
+  public int getNumFailedVolumes() {
+    return volumes.numberOfFailedVolumes();
+  }
+
+  /**
    * Find the block's on-disk length
    */
   @Override // FSDatasetInterface
-  public long getLength(Block b) throws IOException {
+  public long getLength(ExtendedBlock b) throws IOException {
     return getBlockFile(b).length();
   }
 
   /**
    * Get File name for a given block.
    */
-  public synchronized File getBlockFile(Block b) throws IOException {
-    File f = validateBlockFile(b);
+  public File getBlockFile(ExtendedBlock b) throws IOException {
+    return getBlockFile(b.getBlockPoolId(), b.getLocalBlock());
+  }
+  
+  /**
+   * Get File name for a given block.
+   */
+  public synchronized File getBlockFile(String bpid, Block b)
+      throws IOException {
+    File f = validateBlockFile(bpid, b);
     if(f == null) {
       if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
         InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
@@ -918,13 +1263,14 @@ public class FSDataset implements FSCons
   }
   
   @Override // FSDatasetInterface
-  public synchronized InputStream getBlockInputStream(Block b) throws IOException {
+  public synchronized InputStream getBlockInputStream(ExtendedBlock b)
+      throws IOException {
     return new FileInputStream(getBlockFile(b));
   }
 
   @Override // FSDatasetInterface
-  public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
-
+  public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+      long seekOffset) throws IOException {
     File blockFile = getBlockFile(b);
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
     if (seekOffset > 0) {
@@ -934,16 +1280,38 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * Get the meta info of a block stored in volumeMap
-   * @param b block
-   * @return the meta replica information
-   * @throws IOException if no entry is in the map or 
+   * Get the meta info of a block stored in volumeMap. To find a block,
+   * block pool Id, block Id and generation stamp must match.
+   * @param b extended block
+   * @return the meta replica information; null if block was not found
+   * @throws ReplicaNotFoundException if no entry is in the map or 
+   *                        there is a generation stamp mismatch
+   */
+  private ReplicaInfo getReplicaInfo(ExtendedBlock b)
+      throws ReplicaNotFoundException {
+    ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
+    if (info == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    return info;
+  }
+  
+  /**
+   * Get the meta info of a block stored in volumeMap. Block is looked up
+   * without matching the generation stamp.
+   * @param bpid block pool Id
+   * @param blkid block Id
+   * @return the meta replica information; null if block was not found
+   * @throws ReplicaNotFoundException if no entry is in the map or 
    *                        there is a generation stamp mismatch
    */
-  private ReplicaInfo getReplicaInfo(Block b) throws IOException {
-    ReplicaInfo info = volumeMap.get(b);
+  private ReplicaInfo getReplicaInfo(String bpid, long blkid)
+      throws ReplicaNotFoundException {
+    ReplicaInfo info = volumeMap.get(bpid, blkid);
     if (info == null) {
-      throw new IOException("Block " + b + " does not exist in volumeMap.");
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
     }
     return info;
   }
@@ -952,9 +1320,8 @@ public class FSDataset implements FSCons
    * Returns handles to the block file and its metadata file
    */
   @Override // FSDatasetInterface
-  public synchronized BlockInputStreams getTmpInputStreams(Block b, 
+  public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b, 
                           long blkOffset, long ckoff) throws IOException {
-
     ReplicaInfo info = getReplicaInfo(b);
     File blockFile = info.getBlockFile();
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
@@ -980,20 +1347,16 @@ public class FSDataset implements FSCons
    * @return - true if the specified block was unlinked or the block
    *           is not in any snapshot.
    */
-  public boolean unlinkBlock(Block block, int numLinks) throws IOException {
-    ReplicaInfo info = null;
-
-    synchronized (this) {
-      info = getReplicaInfo(block);
-    }
-   return info.unlinkBlock(numLinks);
+  public boolean unlinkBlock(ExtendedBlock block, int numLinks) throws IOException {
+    ReplicaInfo info = getReplicaInfo(block);
+    return info.unlinkBlock(numLinks);
   }
 
   private static File moveBlockFiles(Block b, File srcfile, File destdir
       ) throws IOException {
     final File dstfile = new File(destdir, b.getBlockName());
-    final File srcmeta = getMetaFile(srcfile, b);
-    final File dstmeta = getMetaFile(dstfile, b);
+    final File srcmeta = getMetaFile(srcfile, b.getGenerationStamp());
+    final File dstmeta = getMetaFile(dstfile, b.getGenerationStamp());
     if (!srcmeta.renameTo(dstmeta)) {
       throw new IOException("Failed to move meta file for " + b
           + " from " + srcmeta + " to " + dstmeta);
@@ -1075,7 +1438,7 @@ public class FSDataset implements FSCons
   }
 
   @Override  // FSDatasetInterface
-  public synchronized ReplicaInPipelineInterface append(Block b,
+  public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
     // If the block was successfully finalized because all packets
     // were successfully processed at the Datanode but the ack for
@@ -1088,11 +1451,7 @@ public class FSDataset implements FSCons
       throw new IOException("The new generation stamp " + newGS + 
           " should be greater than the replica " + b + "'s generation stamp");
     }
-    ReplicaInfo replicaInfo = volumeMap.get(b);
-    if (replicaInfo == null) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-    }  
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
     DataNode.LOG.info("Appending to replica " + replicaInfo);
     if (replicaInfo.getState() != ReplicaState.FINALIZED) {
       throw new ReplicaNotFoundException(
@@ -1104,13 +1463,15 @@ public class FSDataset implements FSCons
           " expected length is " + expectedBlockLen);
     }
 
-    return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+    return append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
+        b.getNumBytes());
   }
   
   /** Append to a finalized replica
    * Change a finalized replica to be a RBW replica and 
    * bump its generation stamp to be the newGS
    * 
+   * @param bpid block pool Id
    * @param replicaInfo a finalized replica
    * @param newGS new generation stamp
    * @param estimateBlockLen estimate generation stamp
@@ -1118,8 +1479,9 @@ public class FSDataset implements FSCons
    * @throws IOException if moving the replica from finalized directory 
    *         to rbw directory fails
    */
-  private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo, 
-      long newGS, long estimateBlockLen) throws IOException {
+  private synchronized ReplicaBeingWritten append(String bpid,
+      FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
+      throws IOException {
     // unlink the finalized replica
     replicaInfo.unlinkBlock(1);
     
@@ -1130,7 +1492,7 @@ public class FSDataset implements FSCons
       throw new DiskOutOfSpaceException("Insufficient space for appending to "
           + replicaInfo);
     }
-    File newBlkFile = new File(v.rbwDir, replicaInfo.getBlockName());
+    File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
     File oldmeta = replicaInfo.getMetaFile();
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
         replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
@@ -1163,18 +1525,14 @@ public class FSDataset implements FSCons
     }
     
     // Replace finalized replica by a RBW replica in replicas map
-    volumeMap.add(newReplicaInfo);
+    volumeMap.add(bpid, newReplicaInfo);
     
     return newReplicaInfo;
   }
 
-  private ReplicaInfo recoverCheck(Block b, long newGS, 
+  private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 
       long expectedBlockLen) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
-    if (replicaInfo == null) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-    }
+    ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
     
     // check state
     if (replicaInfo.getState() != ReplicaState.FINALIZED &&
@@ -1219,8 +1577,9 @@ public class FSDataset implements FSCons
     
     return replicaInfo;
   }
+  
   @Override  // FSDatasetInterface
-  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+  public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
     DataNode.LOG.info("Recover failed append to " + b);
 
@@ -1228,7 +1587,8 @@ public class FSDataset implements FSCons
 
     // change the replica's state/gs etc.
     if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
-      return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+      return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS, 
+          b.getNumBytes());
     } else { //RBW
       bumpReplicaGS(replicaInfo, newGS);
       return (ReplicaBeingWritten)replicaInfo;
@@ -1236,16 +1596,17 @@ public class FSDataset implements FSCons
   }
 
   @Override // FSDatasetInterface
-  public void recoverClose(Block b, long newGS,
+  public void recoverClose(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
     DataNode.LOG.info("Recover failed close " + b);
     // check replica's state
-    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS,
+        expectedBlockLen);
     // bump the replica's GS
     bumpReplicaGS(replicaInfo, newGS);
     // finalize the replica if RBW
     if (replicaInfo.getState() == ReplicaState.RBW) {
-      finalizeBlock(replicaInfo);
+      finalizeReplica(b.getBlockPoolId(), replicaInfo);
     }
   }
   
@@ -1277,9 +1638,10 @@ public class FSDataset implements FSCons
   }
 
   @Override // FSDatasetInterface
-  public synchronized ReplicaInPipelineInterface createRbw(Block b)
+  public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
       throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+        b.getBlockId());
     if (replicaInfo != null) {
       throw new ReplicaAlreadyExistsException("Block " + b +
       " already exists in state " + replicaInfo.getState() +
@@ -1288,24 +1650,20 @@ public class FSDataset implements FSCons
     // create a new block
     FSVolume v = volumes.getNextVolume(b.getNumBytes());
     // create a rbw file to hold block in the designated volume
-    File f = v.createRbwFile(b);
+    File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile());
-    volumeMap.add(newReplicaInfo);
+    volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
     return newReplicaInfo;
   }
   
   @Override // FSDatasetInterface
-  public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+  public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
       long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
     DataNode.LOG.info("Recover the RBW replica " + b);
 
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
-    if (replicaInfo == null) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-    }
+    ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
     
     // check the replica's state
     if (replicaInfo.getState() != ReplicaState.RBW) {
@@ -1346,7 +1704,7 @@ public class FSDataset implements FSCons
   
   @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
-      final Block b) throws IOException {
+      final ExtendedBlock b) throws IOException {
     final long blockId = b.getBlockId();
     final long expectedGs = b.getGenerationStamp();
     final long visible = b.getNumBytes();
@@ -1356,7 +1714,7 @@ public class FSDataset implements FSCons
     final ReplicaInPipeline temp;
     {
       // get replica
-      final ReplicaInfo r = volumeMap.get(blockId);
+      final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
       if (r == null) {
         throw new ReplicaNotFoundException(
             ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
@@ -1392,21 +1750,23 @@ public class FSDataset implements FSCons
     }
     
     // move block files to the rbw directory
-    final File dest = moveBlockFiles(b, temp.getBlockFile(), v.rbwDir);
+    BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
+    final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), 
+        bpslice.getRbwDir());
     // create RBW
     final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
         blockId, numBytes, expectedGs,
         v, dest.getParentFile(), Thread.currentThread());
     rbw.setBytesAcked(visible);
     // overwrite the RBW in the volume map
-    volumeMap.add(rbw);
+    volumeMap.add(b.getBlockPoolId(), rbw);
     return rbw;
   }
 
   @Override // FSDatasetInterface
-  public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+  public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
       throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
     if (replicaInfo != null) {
       throw new ReplicaAlreadyExistsException("Block " + b +
           " already exists in state " + replicaInfo.getState() +
@@ -1415,10 +1775,10 @@ public class FSDataset implements FSCons
     
     FSVolume v = volumes.getNextVolume(b.getNumBytes());
     // create a temporary file to hold block in the designated volume
-    File f = v.createTmpFile(b);
+    File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile());
-    volumeMap.add(newReplicaInfo);
+    volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
     
     return newReplicaInfo;
   }
@@ -1428,7 +1788,7 @@ public class FSDataset implements FSCons
    * last checksum will be overwritten.
    */
   @Override // FSDatasetInterface
-  public void adjustCrcChannelPosition(Block b, BlockWriteStreams streams, 
+  public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams streams, 
       int checksumSize) throws IOException {
     FileOutputStream file = (FileOutputStream) streams.checksumOut;
     FileChannel channel = file.getChannel();
@@ -1441,14 +1801,17 @@ public class FSDataset implements FSCons
     channel.position(newPos);
   }
 
-  synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
+  synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws IOException {
     if ( vol == null ) {
-      vol = getReplicaInfo( blk ).getVolume();
+      ReplicaInfo replica = volumeMap.get(bpid, blk);
+      if (replica != null) {
+        vol = volumeMap.get(bpid, blk).getVolume();
+      }
       if ( vol == null ) {
         throw new IOException("Could not find volume for block " + blk);
       }
     }
-    return vol.createTmpFile(blk);
+    return vol.createTmpFile(bpid, blk);
   }
 
   //
@@ -1463,18 +1826,18 @@ public class FSDataset implements FSCons
    * Complete the block write!
    */
   @Override // FSDatasetInterface
-  public synchronized void finalizeBlock(Block b) throws IOException {
+  public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
     ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo.getState() == ReplicaState.FINALIZED) {
       // this is legal, when recovery happens on a file that has
       // been opened for append but never modified
       return;
     }
-    finalizeReplica(replicaInfo);
+    finalizeReplica(b.getBlockPoolId(), replicaInfo);
   }
   
-  private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
-  throws IOException {
+  private synchronized FinalizedReplica finalizeReplica(String bpid,
+      ReplicaInfo replicaInfo) throws IOException {
     FinalizedReplica newReplicaInfo = null;
     if (replicaInfo.getState() == ReplicaState.RUR &&
        ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() == 
@@ -1489,10 +1852,10 @@ public class FSDataset implements FSCons
             " for block " + replicaInfo);
       }
 
-      File dest = v.addBlock(replicaInfo, f);
+      File dest = v.addBlock(bpid, replicaInfo, f);
       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
     }
-    volumeMap.add(newReplicaInfo);
+    volumeMap.add(bpid, newReplicaInfo);
     return newReplicaInfo;
   }
 
@@ -1500,15 +1863,16 @@ public class FSDataset implements FSCons
    * Remove the temporary block file (if any)
    */
   @Override // FSDatasetInterface
-  public synchronized void unfinalizeBlock(Block b) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b);
+  public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+        b.getLocalBlock());
     if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
       // remove from volumeMap
-      volumeMap.remove(b);
+      volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
       
       // delete the on-disk temp file
       if (delBlockFromDisk(replicaInfo.getBlockFile(), 
-          replicaInfo.getMetaFile(), b)) {
+          replicaInfo.getMetaFile(), b.getLocalBlock())) {
         DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
       }
     }
@@ -1544,12 +1908,16 @@ public class FSDataset implements FSCons
    * Generates a block report from the in-memory block map.
    */
   @Override // FSDatasetInterface
-  public BlockListAsLongs getBlockReport() {
-    ArrayList<ReplicaInfo> finalized =
-      new ArrayList<ReplicaInfo>(volumeMap.size());
+  public BlockListAsLongs getBlockReport(String bpid) {
+    int size =  volumeMap.size(bpid);
+    ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size);
     ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
+    if (size == 0) {
+      return new BlockListAsLongs(finalized, uc);
+    }
+    
     synchronized(this) {
-      for (ReplicaInfo b : volumeMap.replicas()) {
+      for (ReplicaInfo b : volumeMap.replicas(bpid)) {
         switch(b.getState()) {
         case FINALIZED:
           finalized.add(b);
@@ -1573,27 +1941,11 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * Get the block list from in-memory blockmap. Note if <deepcopy>
-   * is false, reference to the block in the volumeMap is returned. This block
-   * should not be changed. Suitable synchronization using {@link FSDataset}
-   * is needed to handle concurrent modification to the block.
-   */
-  synchronized Block[] getBlockList(boolean deepcopy) {
-    Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
-    if (deepcopy) {
-      for (int i = 0; i < list.length; i++) {
-        list[i] = new Block(list[i]);
-      }
-    }
-    return list;
-  }
-
-  /**
-   * Get the list of finalized blocks from in-memory blockmap.
+   * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
-  synchronized List<Block> getFinalizedBlocks() {
-    ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size());
-    for (ReplicaInfo b : volumeMap.replicas()) {
+  synchronized List<Block> getFinalizedBlocks(String bpid) {
+    ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size(bpid));
+    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
       if(b.getState() == ReplicaState.FINALIZED) {
         finalized.add(new Block(b));
       }
@@ -1606,7 +1958,7 @@ public class FSDataset implements FSCons
    * valid means finalized
    */
   @Override // FSDatasetInterface
-  public boolean isValidBlock(Block b) {
+  public boolean isValidBlock(ExtendedBlock b) {
     return isValid(b, ReplicaState.FINALIZED);
   }
 
@@ -1614,13 +1966,14 @@ public class FSDataset implements FSCons
    * Check whether the given block is a valid RBW.
    */
   @Override // {@link FSDatasetInterface}
-  public boolean isValidRbw(final Block b) {
+  public boolean isValidRbw(final ExtendedBlock b) {
     return isValid(b, ReplicaState.RBW);
   }
 
   /** Does the block exist and have the given state? */
-  private boolean isValid(final Block b, final ReplicaState state) {
-    final ReplicaInfo replicaInfo = volumeMap.get(b);
+  private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
+    final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+        b.getLocalBlock());
     return replicaInfo != null
         && replicaInfo.getState() == state
         && replicaInfo.getBlockFile().exists();
@@ -1629,9 +1982,9 @@ public class FSDataset implements FSCons
   /**
    * Find the file corresponding to the block and return it if it exists.
    */
-  File validateBlockFile(Block b) throws IOException {
+  File validateBlockFile(String bpid, Block b) throws IOException {
     //Should we check for metadata file too?
-    File f = getFile(b);
+    File f = getFile(bpid, b);
     
     if(f != null ) {
       if(f.exists())
@@ -1661,7 +2014,7 @@ public class FSDataset implements FSCons
     }
 
     //check replica's meta file
-    final File metafile = getMetaFile(f, r);
+    final File metafile = getMetaFile(f, r.getGenerationStamp());
     if (!metafile.exists()) {
       throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
     }
@@ -1676,14 +2029,14 @@ public class FSDataset implements FSCons
    * just get rid of it.
    */
   @Override // FSDatasetInterface
-  public void invalidate(Block invalidBlks[]) throws IOException {
+  public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
     boolean error = false;
     for (int i = 0; i < invalidBlks.length; i++) {
       File f = null;
       FSVolume v;
       synchronized (this) {
-        f = getFile(invalidBlks[i]);
-        ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
+        f = getFile(bpid, invalidBlks[i]);
+        ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
         if (dinfo == null || 
             dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
           DataNode.LOG.warn("Unexpected error trying to delete block "
@@ -1722,15 +2075,16 @@ public class FSDataset implements FSCons
             (replicaState == ReplicaState.RUR && 
                 ((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() == 
                   ReplicaState.FINALIZED)) {
-          v.clearPath(parent);
+          v.clearPath(bpid, parent);
         }
-        volumeMap.remove(invalidBlks[i]);
+        volumeMap.remove(bpid, invalidBlks[i]);
       }
-      File metaFile = getMetaFile( f, invalidBlks[i] );
+      File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp());
       long dfsBytes = f.length() + metaFile.length();
       
       // Delete the block asynchronously to make sure we can do it fast enough
-      asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes, invalidBlks[i].toString());
+      asyncDiskService.deleteAsync(v, bpid, f, metaFile, dfsBytes,
+          invalidBlks[i].toString());
     }
     if (error) {
       throw new IOException("Error in deleting blocks.");
@@ -1740,17 +2094,18 @@ public class FSDataset implements FSCons
   /**
    * Turn the block identifier into a filename; ignore generation stamp!!!
    */
-  public synchronized File getFile(Block b) {
-    return getFile(b.getBlockId());
+  public synchronized File getFile(String bpid, Block b) {
+    return getFile(bpid, b.getBlockId());
   }
 
   /**
    * Turn the block identifier into a filename
+   * @param bpid Block pool Id
    * @param blockId a block's id
    * @return on disk data file path; null if the replica does not exist
    */
-  private File getFile(long blockId) {
-    ReplicaInfo info = volumeMap.get(blockId);
+  private File getFile(String bpid, long blockId) {
+    ReplicaInfo info = volumeMap.get(bpid, blockId);
     if (info != null) {
       return info.getBlockFile();
     }
@@ -1775,20 +2130,19 @@ public class FSDataset implements FSCons
     // Otherwise remove blocks for the failed volumes
     long mlsec = System.currentTimeMillis();
     synchronized (this) {
-      Iterator<ReplicaInfo> ib = volumeMap.replicas().iterator();
-      while (ib.hasNext()) {
-        ReplicaInfo b = ib.next();
-        totalBlocks++;
-        // check if the volume block belongs to still valid
-        FSVolume vol = b.getVolume();
-        for (FSVolume fv: failedVols) {
-          if (vol == fv) {
-            DataNode.LOG.warn("Removing replica info for block " + 
-              b.getBlockId() + " on failed volume " + 
-              vol.dataDir.dir.getAbsolutePath());
-            ib.remove();
-            removedBlocks++;
-            break;
+      for (FSVolume fv: failedVols) {
+        for (String bpid : fv.map.keySet()) {
+          Iterator<ReplicaInfo> ib = volumeMap.replicas(bpid).iterator();
+          while(ib.hasNext()) {
+            ReplicaInfo b = ib.next();
+            totalBlocks++;
+            // check if the volume block belongs to still valid
+            if(b.getVolume() == fv) {
+              DataNode.LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
+                  + " on failed volume " + fv.currentDir.getAbsolutePath());
+              ib.remove();
+              removedBlocks++;
+            }
           }
         }
       }
@@ -1800,7 +2154,7 @@ public class FSDataset implements FSCons
     // report the error
     StringBuilder sb = new StringBuilder();
     for (FSVolume fv : failedVols) {
-      sb.append(fv.dataDir.dir.getAbsolutePath() + ";");
+      sb.append(fv.currentDir.getAbsolutePath() + ";");
     }
 
     throw  new DiskErrorException("DataNode failed volumes:" + sb);
@@ -1850,14 +2204,11 @@ public class FSDataset implements FSCons
     }
     
     if(volumes != null) {
-      for (FSVolume volume : volumes.volumes) {
-        if(volume != null) {
-          volume.dfsUsage.shutdown();
-        }
-      }
+      volumes.shutdown();
     }
   }
 
+  @Override // FSDatasetMBean
   public String getStorageInfo() {
     return toString();
   }
@@ -1885,13 +2236,13 @@ public class FSDataset implements FSCons
    * @param diskMetaFile Metadata file from on the disk
    * @param vol Volume of the block file
    */
-  public void checkAndUpdate(long blockId, File diskFile,
+  public void checkAndUpdate(String bpid, long blockId, File diskFile,
       File diskMetaFile, FSVolume vol) {
     DataNode datanode = DataNode.getDataNode();
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
     synchronized (this) {
-      memBlockInfo = volumeMap.get(blockId);
+      memBlockInfo = volumeMap.get(bpid, blockId);
       if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
         return;
@@ -1915,9 +2266,9 @@ public class FSDataset implements FSCons
         if (!memBlockInfo.getBlockFile().exists()) {
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
-          volumeMap.remove(blockId);
+          volumeMap.remove(bpid, blockId);
           if (datanode.blockScanner != null) {
-            datanode.blockScanner.deleteBlock(new Block(blockId));
+            datanode.blockScanner.deleteBlock(bpid, new Block(blockId));
           }
           DataNode.LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
@@ -1937,9 +2288,9 @@ public class FSDataset implements FSCons
         // Block is missing in memory - add the block to volumeMap
         ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
             diskFile.length(), diskGS, vol, diskFile.getParentFile());
-        volumeMap.add(diskBlockInfo);
+        volumeMap.add(bpid, diskBlockInfo);
         if (datanode.blockScanner != null) {
-          datanode.blockScanner.addBlock(diskBlockInfo);
+          datanode.blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
         }
         DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
         return;
@@ -1975,7 +2326,8 @@ public class FSDataset implements FSCons
 
       // Compare generation stamp
       if (memBlockInfo.getGenerationStamp() != diskGS) {
-        File memMetaFile = getMetaFile(diskFile, memBlockInfo);
+        File memMetaFile = getMetaFile(diskFile, 
+            memBlockInfo.getGenerationStamp());
         if (memMetaFile.exists()) {
           if (memMetaFile.compareTo(diskMetaFile) != 0) {
             DataNode.LOG.warn("Metadata file in memory "
@@ -2010,12 +2362,10 @@ public class FSDataset implements FSCons
 
     // Send corrupt block report outside the lock
     if (corruptBlock != null) {
-      DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
-      LocatedBlock[] blocks = { new LocatedBlock(corruptBlock, dnArr) };
+      DataNode.LOG.warn("Reporting the block " + corruptBlock
+          + " as corrupt due to length mismatch");
       try {
-        datanode.namenode.reportBadBlocks(blocks);
-        DataNode.LOG.warn("Reporting the block " + corruptBlock
-            + " as corrupt due to length mismatch");
+        datanode.reportBadBlocks(new ExtendedBlock(bpid, corruptBlock));  
       } catch (IOException e) {
         DataNode.LOG.warn("Failed to repot bad block " + corruptBlock
             + "Exception:" + StringUtils.stringifyException(e));
@@ -2024,32 +2374,31 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * @deprecated use {@link #fetchReplicaInfo(long)} instead.
+   * @deprecated use {@link #fetchReplicaInfo(String, long)} instead.
    */
   @Override // FSDatasetInterface
   @Deprecated
-  public ReplicaInfo getReplica(long blockId) {
-    assert(Thread.holdsLock(this));
-    return volumeMap.get(blockId);
+  public ReplicaInfo getReplica(String bpid, long blockId) {
+    return volumeMap.get(bpid, blockId);
   }
 
   @Override 
-  public synchronized String getReplicaString(long blockId) {
-    final Replica r = volumeMap.get(blockId);
+  public synchronized String getReplicaString(String bpid, long blockId) {
+    final Replica r = volumeMap.get(bpid, blockId);
     return r == null? "null": r.toString();
   }
 
   @Override // FSDatasetInterface
   public synchronized ReplicaRecoveryInfo initReplicaRecovery(
       RecoveringBlock rBlock) throws IOException {
-    return initReplicaRecovery(
-        volumeMap, rBlock.getBlock(), rBlock.getNewGenerationStamp());
+    return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(),
+        volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp());
   }
 
   /** static version of {@link #initReplicaRecovery(Block, long)}. */
-  static ReplicaRecoveryInfo initReplicaRecovery(
+  static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
       ReplicasMap map, Block block, long recoveryId) throws IOException {
-    final ReplicaInfo replica = map.get(block.getBlockId());
+    final ReplicaInfo replica = map.get(bpid, block.getBlockId());
     DataNode.LOG.info("initReplicaRecovery: block=" + block
         + ", recoveryId=" + recoveryId
         + ", replica=" + replica);
@@ -2104,7 +2453,7 @@ public class FSDataset implements FSCons
     }
     else {
       rur = new ReplicaUnderRecovery(replica, recoveryId);
-      map.add(rur);
+      map.add(bpid, rur);
       DataNode.LOG.info("initReplicaRecovery: changing replica state for "
           + block + " from " + replica.getState()
           + " to " + rur.getState());
@@ -2114,11 +2463,12 @@ public class FSDataset implements FSCons
 
   @Override // FSDatasetInterface
   public synchronized ReplicaInfo updateReplicaUnderRecovery(
-                                    final Block oldBlock,
+                                    final ExtendedBlock oldBlock,
                                     final long recoveryId,
                                     final long newlength) throws IOException {
     //get replica
-    final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockId());
+    final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockPoolId(), 
+        oldBlock.getBlockId());
     DataNode.LOG.info("updateReplica: block=" + oldBlock
         + ", recoveryId=" + recoveryId
         + ", length=" + newlength
@@ -2146,8 +2496,8 @@ public class FSDataset implements FSCons
     checkReplicaFiles(replica);
 
     //update replica
-    final FinalizedReplica finalized = updateReplicaUnderRecovery(
-        (ReplicaUnderRecovery)replica, recoveryId, newlength);
+    final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
+        .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength);
 
     //check replica files after update
     checkReplicaFiles(finalized);
@@ -2155,6 +2505,7 @@ public class FSDataset implements FSCons
   }
 
   private FinalizedReplica updateReplicaUnderRecovery(
+                                          String bpid,
                                           ReplicaUnderRecovery rur,
                                           long recoveryId,
                                           long newlength) throws IOException {
@@ -2181,16 +2532,14 @@ public class FSDataset implements FSCons
    }
 
     // finalize the block
-    return finalizeReplica(rur);
+    return finalizeReplica(bpid, rur);
   }
 
   @Override // FSDatasetInterface
-  public synchronized long getReplicaVisibleLength(final Block block)
+  public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    final Replica replica = volumeMap.get(block.getBlockId());
-    if (replica == null) {
-      throw new ReplicaNotFoundException(block);
-    }
+    final Replica replica = getReplicaInfo(block.getBlockPoolId(), 
+        block.getBlockId());
     if (replica.getGenerationStamp() < block.getGenerationStamp()) {
       throw new IOException(
           "replica.getGenerationStamp() < block.getGenerationStamp(), block="
@@ -2198,6 +2547,29 @@ public class FSDataset implements FSCons
     }
     return replica.getVisibleLength();
   }
+  
+  public synchronized void addBlockPool(String bpid, Configuration conf)
+      throws IOException {
+    DataNode.LOG.info("Adding block pool " + bpid);
+    volumes.addBlockPool(bpid, conf);
+    volumeMap.initBlockPool(bpid);
+    volumes.getVolumeMap(bpid, volumeMap);
+  }
+  
+  public synchronized void shutdownBlockPool(String bpid) {
+    DataNode.LOG.info("Removing block pool " + bpid);
+    volumeMap.cleanUpBlockPool(bpid);
+    volumes.removeBlockPool(bpid);
+  }
+  
+  /**
+   * get list of all bpids
+   * @return list of bpids
+   */
+  public String [] getBPIdlist() throws IOException {
+    return volumeMap.getBlockPoolList();
+  }
+  
   /**
    * Class for representing the Datanode volume information
    */
@@ -2215,28 +2587,41 @@ public class FSDataset implements FSCons
     }
   }  
   
-  synchronized Collection<VolumeInfo> getVolumeInfo() {
+  Collection<VolumeInfo> getVolumeInfo() {
     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
-    synchronized(volumes.volumes) {
+    for (FSVolume volume : volumes.volumes) {
+      long used = 0;
+      long free = 0;
+      try {
+        used = volume.getDfsUsed();
+        free = volume.getAvailable();
+      } catch (IOException e) {
+        DataNode.LOG.warn(e.getMessage());
+        used = 0;
+        free = 0;
+      }
+      
+      info.add(new VolumeInfo(volume.toString(), used, free, 
+          volume.getReserved()));
+    }
+    return info;
+  }
+  
+  @Override //FSDatasetInterface
+  public synchronized void deleteBlockPool(String bpid, boolean force)
+      throws IOException {
+    if (!force) {
       for (FSVolume volume : volumes.volumes) {
-        long used = 0;
-        try {
-          used = volume.getDfsUsed();
-        } catch (IOException e) {
-          DataNode.LOG.warn(e.getMessage());
+        if (!volume.isBPDirEmpty(bpid)) {
+          DataNode.LOG.warn(bpid
+              + " has some block files, cannot delete unless forced");
+          throw new IOException("Cannot delete block pool, "
+              + "it contains some block files");
         }
-        
-        long free= 0;
-        try {
-          free = volume.getAvailable();
-        } catch (IOException e) {
-          DataNode.LOG.warn(e.getMessage());
-        }
-        
-        info.add(new VolumeInfo(volume.toString(), used, free, 
-            volume.getReserved()));
       }
-      return info;
+    }
+    for (FSVolume volume : volumes.volumes) {
+      volume.deleteBPDirectories(bpid, force);
     }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Thu May  5 05:40:07 2011
@@ -147,12 +147,12 @@ class FSDatasetAsyncDiskService {
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.
    */
-  void deleteAsync(FSDataset.FSVolume volume, File blockFile,
+  void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile,
       File metaFile, long dfsBytes, String blockName) {
     DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
         + " for deletion");
     ReplicaFileDeleteTask deletionTask = 
-        new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes,
+        new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile, dfsBytes,
             blockName);
     execute(volume.getCurrentDir(), deletionTask);
   }
@@ -161,16 +161,17 @@ class FSDatasetAsyncDiskService {
    *  as decrement the dfs usage of the volume. 
    */
   static class ReplicaFileDeleteTask implements Runnable {
-
-    FSDataset.FSVolume volume;
-    File blockFile;
-    File metaFile;
-    long dfsBytes;
-    String blockName;
+    final FSDataset.FSVolume volume;
+    final String blockPoolId;
+    final File blockFile;
+    final File metaFile;
+    final long dfsBytes;
+    final String blockName;
     
-    ReplicaFileDeleteTask(FSDataset.FSVolume volume, File blockFile,
-        File metaFile, long dfsBytes, String blockName) {
+    ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid,
+        File blockFile, File metaFile, long dfsBytes, String blockName) {
       this.volume = volume;
+      this.blockPoolId = bpid;
       this.blockFile = blockFile;
       this.metaFile = metaFile;
       this.dfsBytes = dfsBytes;
@@ -184,18 +185,21 @@ class FSDatasetAsyncDiskService {
     @Override
     public String toString() {
       // Called in AsyncDiskService.execute for displaying error messages.
-      return "deletion of block " + blockName + " with block file " + blockFile
-          + " and meta file " + metaFile + " from volume " + volume;
+      return "deletion of block " + blockPoolId + " " + blockName
+          + " with block file " + blockFile + " and meta file " + metaFile
+          + " from volume " + volume;
     }
 
     @Override
     public void run() {
       if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
         DataNode.LOG.warn("Unexpected error trying to delete block "
-            + blockName + " at file " + blockFile + ". Ignored.");
+            + blockPoolId + " " + blockName + " at file " + blockFile
+            + ". Ignored.");
       } else {
-        volume.decDfsUsed(dfsBytes);
-        DataNode.LOG.info("Deleted block " + blockName + " at file " + blockFile);
+        volume.decDfsUsed(blockPoolId, dfsBytes);
+        DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName
+            + " at file " + blockFile);
       }
     }
   };

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu May  5 05:40:07 2011
@@ -25,11 +25,13 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -50,7 +52,7 @@ public interface FSDatasetInterface exte
    * @return the length of the metadata file for the specified block.
    * @throws IOException
    */
-  public long getMetaDataLength(Block b) throws IOException;
+  public long getMetaDataLength(ExtendedBlock b) throws IOException;
   
   /**
    * This class provides the input stream and length of the metadata
@@ -75,7 +77,7 @@ public interface FSDatasetInterface exte
    * @return the metadata input stream; 
    * @throws IOException
    */
-  public MetaDataInputStream getMetaDataInputStream(Block b)
+  public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
         throws IOException;
   
   /**
@@ -84,7 +86,7 @@ public interface FSDatasetInterface exte
    * @return true of the metafile for specified block exits
    * @throws IOException
    */
-  public boolean metaFileExists(Block b) throws IOException;
+  public boolean metaFileExists(ExtendedBlock b) throws IOException;
 
 
   /**
@@ -93,7 +95,7 @@ public interface FSDatasetInterface exte
    * @return   the specified block's on-disk length (excluding metadta)
    * @throws IOException
    */
-  public long getLength(Block b) throws IOException;
+  public long getLength(ExtendedBlock b) throws IOException;
 
   /**
    * Get reference to the replica meta info in the replicasMap. 
@@ -102,17 +104,18 @@ public interface FSDatasetInterface exte
    * @return replica from the replicas map
    */
   @Deprecated
-  public Replica getReplica(long blockId);
+  public Replica getReplica(String bpid, long blockId);
 
   /**
    * @return replica meta information
    */
-  public String getReplicaString(long blockId);
+  public String getReplicaString(String bpid, long blockId);
 
   /**
    * @return the generation stamp stored with the block.
    */
-  public Block getStoredBlock(long blkid) throws IOException;
+  public Block getStoredBlock(String bpid, long blkid)
+      throws IOException;
 
   /**
    * Returns an input stream to read the contents of the specified block
@@ -120,7 +123,7 @@ public interface FSDatasetInterface exte
    * @return an input stream to read the contents of the specified block
    * @throws IOException
    */
-  public InputStream getBlockInputStream(Block b) throws IOException;
+  public InputStream getBlockInputStream(ExtendedBlock b) throws IOException;
   
   /**
    * Returns an input stream at specified offset of the specified block
@@ -130,7 +133,7 @@ public interface FSDatasetInterface exte
    *  starting at the offset
    * @throws IOException
    */
-  public InputStream getBlockInputStream(Block b, long seekOffset)
+  public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset)
             throws IOException;
 
   /**
@@ -143,8 +146,8 @@ public interface FSDatasetInterface exte
    *  starting at the offset
    * @throws IOException
    */
-  public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff)
-            throws IOException;
+  public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
+      long ckoff) throws IOException;
 
      /**
       * 
@@ -193,7 +196,7 @@ public interface FSDatasetInterface exte
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface createTemporary(Block b)
+  public ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
   throws IOException;
 
   /**
@@ -203,7 +206,7 @@ public interface FSDatasetInterface exte
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface createRbw(Block b) throws IOException;
+  public ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException;
 
   /**
    * Recovers a RBW replica and returns the meta info of the replica
@@ -215,7 +218,7 @@ public interface FSDatasetInterface exte
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface recoverRbw(Block b, 
+  public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, 
       long newGS, long minBytesRcvd, long maxBytesRcvd)
   throws IOException;
 
@@ -225,7 +228,7 @@ public interface FSDatasetInterface exte
    * @return the result RBW
    */
   public ReplicaInPipelineInterface convertTemporaryToRbw(
-      Block temporary) throws IOException;
+      ExtendedBlock temporary) throws IOException;
 
   /**
    * Append to a finalized replica and returns the meta info of the replica
@@ -236,7 +239,7 @@ public interface FSDatasetInterface exte
    * @return the meata info of the replica which is being written to
    * @throws IOException
    */
-  public ReplicaInPipelineInterface append(Block b, 
+  public ReplicaInPipelineInterface append(ExtendedBlock b, 
       long newGS, long expectedBlockLen) throws IOException;
 
   /**
@@ -249,7 +252,7 @@ public interface FSDatasetInterface exte
    * @return the meta info of the replica which is being written to
    * @throws IOException
    */
-  public ReplicaInPipelineInterface recoverAppend(Block b,
+  public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException;
   
   /**
@@ -261,7 +264,7 @@ public interface FSDatasetInterface exte
    * @param expectedBlockLen the number of bytes the replica is expected to have
    * @throws IOException
    */
-  public void recoverClose(Block b,
+  public void recoverClose(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException;
   
   /**
@@ -271,7 +274,7 @@ public interface FSDatasetInterface exte
    * @param b
    * @throws IOException
    */
-  public void finalizeBlock(Block b) throws IOException;
+  public void finalizeBlock(ExtendedBlock b) throws IOException;
 
   /**
    * Unfinalizes the block previously opened for writing using writeToBlock.
@@ -279,34 +282,37 @@ public interface FSDatasetInterface exte
    * @param b
    * @throws IOException
    */
-  public void unfinalizeBlock(Block b) throws IOException;
+  public void unfinalizeBlock(ExtendedBlock b) throws IOException;
 
   /**
-   * Returns the block report - the full list of blocks stored
+   * Returns the block report - the full list of blocks stored under a 
+   * block pool
+   * @param bpid Block Pool Id
    * @return - the block report - the full list of blocks stored
    */
-  public BlockListAsLongs getBlockReport();
+  public BlockListAsLongs getBlockReport(String bpid);
 
   /**
    * Is the block valid?
    * @param b
    * @return - true if the specified block is valid
    */
-  public boolean isValidBlock(Block b);
+  public boolean isValidBlock(ExtendedBlock b);
 
   /**
    * Is the block a valid RBW?
    * @param b
    * @return - true if the specified block is a valid RBW
    */
-  public boolean isValidRbw(Block b);
+  public boolean isValidRbw(ExtendedBlock b);
 
   /**
    * Invalidates the specified blocks
+   * @param bpid Block pool Id
    * @param invalidBlks - the blocks to be invalidated
    * @throws IOException
    */
-  public void invalidate(Block invalidBlks[]) throws IOException;
+  public void invalidate(String bpid, Block invalidBlks[]) throws IOException;
 
     /**
      * Check if all the data directories are healthy
@@ -332,7 +338,7 @@ public interface FSDatasetInterface exte
    * @param checksumSize number of bytes each checksum has
    * @throws IOException
    */
-  public void adjustCrcChannelPosition(Block b, BlockWriteStreams stream, 
+  public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams stream, 
       int checksumSize) throws IOException;
 
   /**
@@ -345,22 +351,46 @@ public interface FSDatasetInterface exte
   /**
    * Get visible length of the specified replica.
    */
-  long getReplicaVisibleLength(final Block block) throws IOException;
+  long getReplicaVisibleLength(final ExtendedBlock block) throws IOException;
 
   /**
    * Initialize a replica recovery.
-   * 
    * @return actual state of the replica on this data-node or 
    * null if data-node does not have the replica.
    */
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
-  throws IOException;
+      throws IOException;
 
   /**
    * Update replica's generation stamp and length and finalize it.
    */
   public ReplicaInfo updateReplicaUnderRecovery(
-                                          Block oldBlock,
+                                          ExtendedBlock oldBlock,
                                           long recoveryId,
                                           long newLength) throws IOException;
+  /**
+   * add new block pool ID
+   * @param bpid Block pool Id
+   * @param conf Configuration
+   */
+  public void addBlockPool(String bpid, Configuration conf) throws IOException;
+  
+  /**
+   * Shutdown and remove the block pool from underlying storage.
+   * @param bpid Block pool Id to be removed
+   */
+  public void shutdownBlockPool(String bpid) ;
+  
+  /**
+   * Deletes the block pool directories. If force is false, directories are 
+   * deleted only if no block files exist for the block pool. If force 
+   * is true entire directory for the blockpool is deleted along with its
+   * contents.
+   * @param bpid BlockPool Id to be deleted.
+   * @param force If force is false, directories are deleted only if no
+   *        block files exist for the block pool, otherwise entire 
+   *        directory for the blockpool is deleted along with its contents.
+   * @throws IOException
+   */
+  public void deleteBlockPool(String bpid, boolean force) throws IOException;
 }

Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu May  5 05:40:07 2011
@@ -2,6 +2,7 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:713112
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:776175-785643,785929-786278
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
+/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:987665-1095512
 /hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1097628
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1099686

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Thu May  5 05:40:07 2011
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * Exception indicating that DataNode does not have a replica
@@ -43,7 +43,7 @@ public class ReplicaNotFoundException ex
     super();
   }
 
-  ReplicaNotFoundException(Block b) {
+  ReplicaNotFoundException(ExtendedBlock b) {
     super("Replica not found for " + b);
   }
   



Mime
View raw message