hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1099687 [13/15] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/pr...
Date Thu, 05 May 2011 05:40:13 GMT
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java Thu May  5 05:40:07 2011
@@ -17,14 +17,15 @@
 */
 package org.apache.hadoop.hdfs.server.common;
 
+import static org.apache.hadoop.hdfs.protocol.FSConstants.LAYOUT_VERSION;
+
 import java.io.IOException;
+
 import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-
-import static org.apache.hadoop.hdfs.protocol.FSConstants.LAYOUT_VERSION;
-
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hdfs.TestDFSUpg
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.UpgradeObjectDatanode;
 import org.apache.hadoop.hdfs.server.namenode.UpgradeObjectNamenode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 
@@ -42,6 +44,7 @@ public class TestDistributedUpgrade exte
   private Configuration conf;
   private int testCounter = 0;
   private MiniDFSCluster cluster = null;
+  private String clusterId = "testClsterId";
     
   /**
    * Writes an INFO log message containing the parameters.
@@ -64,9 +67,10 @@ public class TestDistributedUpgrade exte
       // nn dirs set to name1 and name2
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                               .format(false)
+                                              .clusterId(clusterId)
                                               .startupOption(operation)
                                               .build(); // should fail
-      throw new AssertionError("Jakob was here. NameNode should have failed to start");
+      throw new AssertionError("NameNode should have failed to start");
     } catch (Exception expected) {
       expected = null;
       // expected
@@ -94,7 +98,7 @@ public class TestDistributedUpgrade exte
     TestDFSUpgradeFromImage testImg = new TestDFSUpgradeFromImage();
     testImg.unpackStorage();
     int numDNs = testImg.numDataNodes;
-
+    
     // register new upgrade objects (ignore all existing)
     UpgradeObjectCollection.initialize();
     UpgradeObjectCollection.registerUpgrade(new UO_Datanode1());
@@ -118,6 +122,7 @@ public class TestDistributedUpgrade exte
     // .startupOption(StartupOption.UPGRADE).build();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                               .format(false)
+                                              .clusterId(clusterId)
                                               .startupOption(StartupOption.UPGRADE)
                                               .build();
     cluster.shutdown();
@@ -132,6 +137,7 @@ public class TestDistributedUpgrade exte
     cluster = new MiniDFSCluster.Builder(conf)
                                 .numDataNodes(numDNs)
                                 .format(false)
+                                .clusterId(clusterId)
                                 .startupOption(StartupOption.UPGRADE)
                                 .build();
     DFSAdmin dfsAdmin = new DFSAdmin();
@@ -143,6 +149,7 @@ public class TestDistributedUpgrade exte
     log("NameCluster regular startup after the upgrade", numDirs);
     cluster = new MiniDFSCluster.Builder(conf)
                                 .numDataNodes(numDNs)
+                                .clusterId(clusterId)
                                 .format(false)
                                 .startupOption(StartupOption.REGULAR)
                                 .build();
@@ -174,7 +181,8 @@ class UO_Datanode extends UpgradeObjectD
 
   public void doUpgrade() throws IOException {
     this.status = (short)100;
-    getDatanode().namenode.processUpgradeCommand(
+    DatanodeProtocol nn = getNamenode();
+    nn.processUpgradeCommand(
         new UpgradeCommand(UpgradeCommand.UC_ACTION_REPORT_STATUS, 
             getVersion(), getUpgradeStatus()));
   }

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestJspHelper.java Thu May  5 05:40:07 2011
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +68,7 @@ public class TestJspHelper {
   public void testGetUgi() throws IOException {
     conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://localhost:4321/");
     HttpServletRequest request = mock(HttpServletRequest.class);
+    ServletContext context = mock(ServletContext.class);
     String user = "TheDoctor";
     Text userText = new Text(user);
     DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText,
@@ -78,19 +80,44 @@ public class TestJspHelper {
         tokenString);
     when(request.getRemoteUser()).thenReturn(user);
 
+    //Test attribute in the url to be used as service in the token.
+    when(request.getParameter(JspHelper.NAMENODE_ADDRESS)).thenReturn(
+        "1.1.1.1:1111");
+
     conf.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
     UserGroupInformation.setConfiguration(conf);
 
-    InetSocketAddress serviceAddr = NameNode.getAddress(conf);
-    Text tokenService = new Text(serviceAddr.getAddress().getHostAddress()
-        + ":" + serviceAddr.getPort());
-
-    UserGroupInformation ugi = JspHelper.getUGI(request, conf);
+    verifyServiceInToken(context, request, "1.1.1.1:1111");
+    
+    //Test attribute name.node.address 
+    //Set the nnaddr url parameter to null.
+    when(request.getParameter(JspHelper.NAMENODE_ADDRESS)).thenReturn(null);
+    InetSocketAddress addr = new InetSocketAddress("localhost", 2222);
+    when(context.getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
+        .thenReturn(addr);
+    verifyServiceInToken(context, request, addr.getAddress().getHostAddress()
+        + ":2222");
+    
+    //Test service already set in the token
+    token.setService(new Text("3.3.3.3:3333"));
+    tokenString = token.encodeToUrlString();
+    //Set the name.node.address attribute in Servlet context to null
+    when(context.getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY))
+        .thenReturn(null);
+    when(request.getParameter(JspHelper.DELEGATION_PARAMETER_NAME)).thenReturn(
+        tokenString);
+    verifyServiceInToken(context, request, "3.3.3.3:3333");
+  }
+  
+  private void verifyServiceInToken(ServletContext context,
+      HttpServletRequest request, String expected) throws IOException {
+    UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
     Token<? extends TokenIdentifier> tokenInUgi = ugi.getTokens().iterator()
         .next();
-    Assert.assertEquals(tokenInUgi.getService(), tokenService);
+    Assert.assertEquals(tokenInUgi.getService().toString(), expected);
   }
   
+  
   @Test
   public void testDelegationTokenUrlParam() {
     conf.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java Thu May  5 05:40:07 2011
@@ -28,11 +28,13 @@ public class DataNodeAdapter {
   /**
    * Fetch a copy of ReplicaInfo from a datanode by block id
    * @param dn datanode to retrieve a replicainfo object from
+   * @param bpid Block pool Id
    * @param blkId id of the replica's block
    * @return copy of ReplicaInfo object @link{FSDataset#fetchReplicaInfo}
    */
   public static ReplicaInfo fetchReplicaInfo (final DataNode dn,
+                                              final String bpid,
                                               final long blkId) {
-    return ((FSDataset)dn.data).fetchReplicaInfo(blkId);
+    return ((FSDataset)dn.data).fetchReplicaInfo(bpid, blkId);
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu May  5 05:40:07 2011
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import javax.management.NotCompliantMBeanException;
@@ -34,8 +35,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -91,13 +95,14 @@ public class SimulatedFSDataset  impleme
     SimulatedOutputStream oStream = null;
     private long bytesAcked;
     private long bytesRcvd;
-    BInfo(Block b, boolean forWriting) throws IOException {
+    BInfo(String bpid, Block b, boolean forWriting) throws IOException {
       theBlock = new Block(b);
       if (theBlock.getNumBytes() < 0) {
         theBlock.setNumBytes(0);
       }
-      if (!storage.alloc(theBlock.getNumBytes())) { // expected length - actual length may
-                                          // be more - we find out at finalize
+      if (!storage.alloc(bpid, theBlock.getNumBytes())) { 
+        // expected length - actual length may
+        // be more - we find out at finalize
         DataNode.LOG.warn("Lack of free storage on a block alloc");
         throw new IOException("Creating block, no free space available");
       }
@@ -140,7 +145,8 @@ public class SimulatedFSDataset  impleme
       }
     }
     
-    synchronized void finalizeBlock(long finalSize) throws IOException {
+    synchronized void finalizeBlock(String bpid, long finalSize)
+        throws IOException {
       if (finalized) {
         throw new IOException(
             "Finalizing a block that has already been finalized" + 
@@ -161,12 +167,12 @@ public class SimulatedFSDataset  impleme
       // adjust if necessary
       long extraLen = finalSize - theBlock.getNumBytes();
       if (extraLen > 0) {
-        if (!storage.alloc(extraLen)) {
+        if (!storage.alloc(bpid,extraLen)) {
           DataNode.LOG.warn("Lack of free storage on a block alloc");
           throw new IOException("Creating block, no free space available");
         }
       } else {
-        storage.free(-extraLen);
+        storage.free(bpid, -extraLen);
       }
       theBlock.setNumBytes(finalSize);  
 
@@ -259,12 +265,41 @@ public class SimulatedFSDataset  impleme
     }
   }
   
-  static private class SimulatedStorage {
-    private long capacity;  // in bytes
+  /**
+   * Class is used for tracking block pool storage utilization similar
+   * to {@link BlockPoolSlice}
+   */
+  private static class SimulatedBPStorage {
     private long used;    // in bytes
     
+    long getUsed() {
+      return used;
+    }
+    
+    void alloc(long amount) {
+      used += amount;
+    }
+    
+    void free(long amount) {
+      used -= amount;
+    }
+    
+    SimulatedBPStorage() {
+      used = 0;   
+    }
+  }
+  
+  /**
+   * Class used for tracking datanode level storage utilization similar
+   * to {@link FSVolumeSet}
+   */
+  private static class SimulatedStorage {
+    private Map<String, SimulatedBPStorage> map = 
+      new HashMap<String, SimulatedBPStorage>();
+    private long capacity;  // in bytes
+    
     synchronized long getFree() {
-      return capacity - used;
+      return capacity - getUsed();
     }
     
     synchronized long getCapacity() {
@@ -272,29 +307,59 @@ public class SimulatedFSDataset  impleme
     }
     
     synchronized long getUsed() {
+      long used = 0;
+      for (SimulatedBPStorage bpStorage : map.values()) {
+        used += bpStorage.getUsed();
+      }
       return used;
     }
     
-    synchronized boolean alloc(long amount) {
+    synchronized long getBlockPoolUsed(String bpid) throws IOException {
+      return getBPStorage(bpid).getUsed();
+    }
+    
+    int getNumFailedVolumes() {
+      return 0;
+    }
+
+    synchronized boolean alloc(String bpid, long amount) throws IOException {
       if (getFree() >= amount) {
-        used += amount;
+        getBPStorage(bpid).alloc(amount);
         return true;
-      } else {
-        return false;    
       }
+      return false;    
     }
     
-    synchronized void free(long amount) {
-      used -= amount;
+    synchronized void free(String bpid, long amount) throws IOException {
+      getBPStorage(bpid).free(amount);
     }
     
     SimulatedStorage(long cap) {
       capacity = cap;
-      used = 0;   
+    }
+    
+    synchronized void addBlockPool(String bpid) {
+      SimulatedBPStorage bpStorage = map.get(bpid);
+      if (bpStorage != null) {
+        return;
+      }
+      map.put(bpid, new SimulatedBPStorage());
+    }
+    
+    synchronized void removeBlockPool(String bpid) {
+      map.remove(bpid);
+    }
+    
+    private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
+      SimulatedBPStorage bpStorage = map.get(bpid);
+      if (bpStorage == null) {
+        throw new IOException("block pool " + bpid + " not found");
+      }
+      return bpStorage;
     }
   }
   
-  private HashMap<Block, BInfo> blockMap = null;
+  private Map<String, Map<Block, BInfo>> blockMap = null;
   private SimulatedStorage storage = null;
   private String storageId;
   
@@ -302,7 +367,9 @@ public class SimulatedFSDataset  impleme
     setConf(conf);
   }
   
-  private SimulatedFSDataset() { // real construction when setConf called.. Uggg
+  // Constructor used for constructing the object using reflection
+  @SuppressWarnings("unused")
+  private SimulatedFSDataset() { // real construction when setConf called..
   }
   
   public Configuration getConf() {
@@ -316,14 +383,12 @@ public class SimulatedFSDataset  impleme
     registerMBean(storageId);
     storage = new SimulatedStorage(
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
-    //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() + 
-    //    "Used = " + getDfsUsed() + "Free =" + getRemaining());
-
-    blockMap = new HashMap<Block,BInfo>(); 
+    blockMap = new HashMap<String, Map<Block,BInfo>>(); 
   }
 
-  public synchronized void injectBlocks(Iterable<Block> injectBlocks)
-                                            throws IOException {
+  public synchronized void injectBlocks(String bpid,
+      Iterable<Block> injectBlocks) throws IOException {
+    ExtendedBlock blk = new ExtendedBlock();
     if (injectBlocks != null) {
       int numInjectedBlocks = 0;
       for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
@@ -331,69 +396,100 @@ public class SimulatedFSDataset  impleme
         if (b == null) {
           throw new NullPointerException("Null blocks in block list");
         }
-        if (isValidBlock(b)) {
+        blk.set(bpid, b);
+        if (isValidBlock(blk)) {
           throw new IOException("Block already exists in  block list");
         }
       }
-      HashMap<Block, BInfo> oldBlockMap = blockMap;
-      blockMap = new HashMap<Block,BInfo>(
-          numInjectedBlocks + oldBlockMap.size());
-      blockMap.putAll(oldBlockMap);
+      Map<Block, BInfo> map = blockMap.get(bpid);
+      if (map == null) {
+        map = new HashMap<Block, BInfo>();
+        blockMap.put(bpid, map);
+      }
+      
       for (Block b: injectBlocks) {
-          BInfo binfo = new BInfo(b, false);
-          blockMap.put(binfo.theBlock, binfo);
+        BInfo binfo = new BInfo(bpid, b, false);
+        map.put(binfo.theBlock, binfo);
       }
     }
   }
+  
+  /** Get a map for a given block pool Id */
+  private Map<Block, BInfo> getMap(String bpid) throws IOException {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map == null) {
+      throw new IOException("Non existent blockpool " + bpid);
+    }
+    return map;
+  }
 
-  @Override
-  public synchronized void finalizeBlock(Block b) throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
-    binfo.finalizeBlock(b.getNumBytes());
-
+    binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes());
   }
 
-  @Override
-  public synchronized void unfinalizeBlock(Block b) throws IOException {
+  @Override // FSDatasetInterface
+  public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
     if (isValidRbw(b)) {
-      blockMap.remove(b);
+      blockMap.remove(b.getLocalBlock());
     }
   }
 
   @Override
-  public synchronized BlockListAsLongs getBlockReport() {
-    Block[] blockTable = new Block[blockMap.size()];
-    int count = 0;
-    for (BInfo b : blockMap.values()) {
-      if (b.isFinalized()) {
-        blockTable[count++] = b.theBlock;
+  public synchronized BlockListAsLongs getBlockReport(String bpid) {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    Block[] blockTable = new Block[map.size()];
+    if (map != null) {
+      int count = 0;
+      for (BInfo b : map.values()) {
+        if (b.isFinalized()) {
+          blockTable[count++] = b.theBlock;
+        }
       }
-    }
-    if (count != blockTable.length) {
-      blockTable = Arrays.copyOf(blockTable, count);
+      if (count != blockTable.length) {
+        blockTable = Arrays.copyOf(blockTable, count);
+      }
+    } else {
+      blockTable = new Block[0];
     }
     return new BlockListAsLongs(
         new ArrayList<Block>(Arrays.asList(blockTable)), null);
   }
 
+  @Override // FSDatasetMBean
   public long getCapacity() throws IOException {
     return storage.getCapacity();
   }
 
+  @Override // FSDatasetMBean
   public long getDfsUsed() throws IOException {
     return storage.getUsed();
   }
 
+  @Override // FSDatasetMBean
+  public long getBlockPoolUsed(String bpid) throws IOException {
+    return storage.getBlockPoolUsed(bpid);
+  }
+  
+  @Override // FSDatasetMBean
   public long getRemaining() throws IOException {
     return storage.getFree();
   }
 
-  @Override
-  public synchronized long getLength(Block b) throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetMBean
+  public int getNumFailedVolumes() {
+    return storage.getNumFailedVolumes();
+  }
+
+  @Override // FSDatasetInterface
+  public synchronized long getLength(ExtendedBlock b) throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
@@ -402,65 +498,84 @@ public class SimulatedFSDataset  impleme
 
   @Override
   @Deprecated
-  public Replica getReplica(long blockId) {
-    return blockMap.get(new Block(blockId));
+  public Replica getReplica(String bpid, long blockId) {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      return map.get(new Block(blockId));
+    }
+    return null;
   }
 
   @Override 
-  public synchronized String getReplicaString(long blockId) {
-    final Replica r = blockMap.get(new Block(blockId));
+  public synchronized String getReplicaString(String bpid, long blockId) {
+    Replica r = null;
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      r = map.get(new Block(blockId));
+    }
     return r == null? "null": r.toString();
   }
 
-  @Override
-  public Block getStoredBlock(long blkid) throws IOException {
-    Block b = new Block(blkid);
-    BInfo binfo = blockMap.get(b);
-    if (binfo == null) {
-      return null;
+  @Override // FSDatasetInterface
+  public Block getStoredBlock(String bpid, long blkid) throws IOException {
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      BInfo binfo = map.get(new Block(blkid));
+      if (binfo == null) {
+        return null;
+      }
+      return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
     }
-    b.setGenerationStamp(binfo.getGenerationStamp());
-    b.setNumBytes(binfo.getNumBytes());
-    return b;
+    return null;
   }
 
-  @Override
-  public synchronized void invalidate(Block[] invalidBlks) throws IOException {
+  @Override // FSDatasetInterface
+  public synchronized void invalidate(String bpid, Block[] invalidBlks)
+      throws IOException {
     boolean error = false;
     if (invalidBlks == null) {
       return;
     }
+    final Map<Block, BInfo> map = getMap(bpid);
     for (Block b: invalidBlks) {
       if (b == null) {
         continue;
       }
-      BInfo binfo = blockMap.get(b);
+      BInfo binfo = map.get(b);
       if (binfo == null) {
         error = true;
         DataNode.LOG.warn("Invalidate: Missing block");
         continue;
       }
-      storage.free(binfo.getNumBytes());
+      storage.free(bpid, binfo.getNumBytes());
       blockMap.remove(b);
     }
-      if (error) {
-          throw new IOException("Invalidate: Missing blocks.");
-      }
+    if (error) {
+      throw new IOException("Invalidate: Missing blocks.");
+    }
   }
 
-  @Override
-  public synchronized boolean isValidBlock(Block b) {
-    // return (blockMap.containsKey(b));
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized boolean isValidBlock(ExtendedBlock b) {
+    final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
+    if (map == null) {
+      return false;
+    }
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       return false;
     }
     return binfo.isFinalized();
   }
 
+  /* check if a block is created but not finalized */
   @Override
-  public synchronized boolean isValidRbw(Block b) {
-    BInfo binfo = blockMap.get(b);
+  public synchronized boolean isValidRbw(ExtendedBlock b) {
+    final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
+    if (map == null) {
+      return false;
+    }
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       return false;
     }
@@ -472,10 +587,11 @@ public class SimulatedFSDataset  impleme
     return getStorageInfo();
   }
 
-  @Override
-  public synchronized ReplicaInPipelineInterface append(Block b,
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null || !binfo.isFinalized()) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
@@ -484,10 +600,11 @@ public class SimulatedFSDataset  impleme
     return binfo;
   }
 
-  @Override
-  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
@@ -495,32 +612,34 @@ public class SimulatedFSDataset  impleme
     if (binfo.isFinalized()) {
       binfo.unfinalizeBlock();
     }
-    blockMap.remove(b);
+    map.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
-  @Override
-  public void recoverClose(Block b, long newGS,
-      long expectedBlockLen) throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+      throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " is not valid, and cannot be appended to.");
     }
     if (!binfo.isFinalized()) {
-      binfo.finalizeBlock(binfo.getNumBytes());
+      binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes());
     }
-    blockMap.remove(b);
+    map.remove(b.getLocalBlock());
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
   }
   
-  @Override
-  public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
       long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
-    BInfo binfo = blockMap.get(b);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if ( binfo == null) {
       throw new ReplicaNotFoundException("Block " + b
           + " does not exist, and cannot be appended to.");
@@ -529,20 +648,20 @@ public class SimulatedFSDataset  impleme
       throw new ReplicaAlreadyExistsException("Block " + b
           + " is valid, and cannot be written to.");
     }
-    blockMap.remove(b);
+    map.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
-    blockMap.put(binfo.theBlock, binfo);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
-  @Override
-  public synchronized ReplicaInPipelineInterface createRbw(Block b) 
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) 
   throws IOException {
     return createTemporary(b);
   }
 
-  @Override
-  public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
       throws IOException {
     if (isValidBlock(b)) {
           throw new ReplicaAlreadyExistsException("Block " + b + 
@@ -552,35 +671,36 @@ public class SimulatedFSDataset  impleme
         throw new ReplicaAlreadyExistsException("Block " + b + 
             " is being written, and cannot be written to.");
     }
-    BInfo binfo = new BInfo(b, true);
-    blockMap.put(binfo.theBlock, binfo);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
+    map.put(binfo.theBlock, binfo);
     return binfo;
   }
 
-  @Override
-  public synchronized InputStream getBlockInputStream(Block b)
-                                            throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized InputStream getBlockInputStream(ExtendedBlock b)
+      throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
     
-    //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len);
     return binfo.getIStream();
   }
   
-  @Override
-  public synchronized InputStream getBlockInputStream(Block b, long seekOffset)
-                              throws IOException {
+  @Override // FSDatasetInterface
+  public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+      long seekOffset) throws IOException {
     InputStream result = getBlockInputStream(b);
     result.skip(seekOffset);
     return result;
   }
 
   /** Not supported */
-  @Override
-  public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff
-      ) throws IOException {
+  @Override // FSDatasetInterface
+  public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
+      long ckoff) throws IOException {
     throw new IOException("Not supported");
   }
 
@@ -591,9 +711,10 @@ public class SimulatedFSDataset  impleme
    * @throws IOException - block does not exist or problems accessing
    *  the meta file
    */
-  private synchronized InputStream getMetaDataInStream(Block b)
+  private synchronized InputStream getMetaDataInStream(ExtendedBlock b)
                                               throws IOException {
-    BInfo binfo = blockMap.get(b);
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -604,9 +725,11 @@ public class SimulatedFSDataset  impleme
     return binfo.getMetaIStream();
   }
  
-  @Override
-  public synchronized long getMetaDataLength(Block b) throws IOException {
-    BInfo binfo = blockMap.get(b);
+  @Override // FSDatasetInterface
+  public synchronized long getMetaDataLength(ExtendedBlock b)
+      throws IOException {
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -617,16 +740,15 @@ public class SimulatedFSDataset  impleme
     return binfo.getMetaIStream().getLength();
   }
   
-  @Override
-  public MetaDataInputStream getMetaDataInputStream(Block b)
-  throws IOException {
-
-       return new MetaDataInputStream(getMetaDataInStream(b),
-                                                getMetaDataLength(b));
+  @Override // FSDatasetInterface
+  public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
+      throws IOException {
+     return new MetaDataInputStream(getMetaDataInStream(b), 
+                                    getMetaDataLength(b));
   }
 
-  @Override
-  public synchronized boolean metaFileExists(Block b) throws IOException {
+  @Override // FSDatasetInterface
+  public synchronized boolean metaFileExists(ExtendedBlock b) throws IOException {
     if (!isValidBlock(b)) {
           throw new IOException("Block " + b +
               " is valid, and cannot be written to.");
@@ -638,8 +760,8 @@ public class SimulatedFSDataset  impleme
     // nothing to check for simulated data set
   }
 
-  @Override
-  public synchronized void adjustCrcChannelPosition(Block b,
+  @Override // FSDatasetInterface
+  public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
                                               BlockWriteStreams stream, 
                                               int checksumSize)
                                               throws IOException {
@@ -812,8 +934,9 @@ public class SimulatedFSDataset  impleme
   @Override
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
   throws IOException {
-    Block b = rBlock.getBlock();
-    BInfo binfo = blockMap.get(b);
+    ExtendedBlock b = rBlock.getBlock();
+    final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
+    BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
       throw new IOException("No such Block " + b );  
     }
@@ -824,22 +947,44 @@ public class SimulatedFSDataset  impleme
   }
 
   @Override // FSDatasetInterface
-  public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock,
+  public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
                                         long recoveryId,
                                         long newlength) throws IOException {
     return new FinalizedReplica(
         oldBlock.getBlockId(), newlength, recoveryId, null, null);
   }
 
-  @Override
-  public long getReplicaVisibleLength(Block block) throws IOException {
+  @Override // FSDatasetInterface
+  public long getReplicaVisibleLength(ExtendedBlock block) throws IOException {
     return block.getNumBytes();
   }
 
+  @Override // FSDatasetInterface
+  public void addBlockPool(String bpid, Configuration conf) {
+    Map<Block, BInfo> map = new HashMap<Block, BInfo>();
+    blockMap.put(bpid, map);
+    storage.addBlockPool(bpid);
+  }
+  
+  @Override // FSDatasetInterface
+  public void shutdownBlockPool(String bpid) {
+    blockMap.remove(bpid);
+    storage.removeBlockPool(bpid);
+  }
+  
+  @Override // FSDatasetInterface
+  public void deleteBlockPool(String bpid, boolean force) {
+     return;
+  }
+
   @Override
-  public ReplicaInPipelineInterface convertTemporaryToRbw(Block temporary)
+  public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
       throws IOException {
-    final BInfo r = blockMap.get(temporary);
+    final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
+    if (map == null) {
+      throw new IOException("Block pool not found, temporary=" + temporary);
+    }
+    final BInfo r = map.get(temporary.getLocalBlock());
     if (r == null) {
       throw new IOException("Block not found, temporary=" + temporary);
     } else if (r.isFinalized()) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Thu May  5 05:40:07 2011
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.REPLACE_BLOCK;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.*;
 
 import java.io.DataInputStream;
@@ -41,9 +40,9 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 /**
  * This class tests if block replacement request to data nodes work correctly.
@@ -120,7 +118,7 @@ public class TestBlockReplacement extend
       LocatedBlock block = locatedBlocks.get(0);
       DatanodeInfo[]  oldNodes = block.getLocations();
       assertEquals(oldNodes.length, 3);
-      Block b = block.getBlock();
+      ExtendedBlock b = block.getBlock();
       
       // add a new datanode to the cluster
       cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
@@ -161,11 +159,11 @@ public class TestBlockReplacement extend
       // start to replace the block
       // case 1: proxySource does not contain the block
       LOG.info("Testcase 1: Proxy " + newNode.getName() 
-          + " does not contain the block " + b.getBlockName() );
+           + " does not contain the block " + b);
       assertFalse(replaceBlock(b, source, newNode, proxies.get(0)));
       // case 2: destination contains the block
       LOG.info("Testcase 2: Destination " + proxies.get(1).getName() 
-          + " contains the block " + b.getBlockName() );
+          + " contains the block " + b);
       assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1)));
       // case 3: correct case
       LOG.info("Testcase 3: Proxy=" + source.getName() + " source=" + 
@@ -224,7 +222,7 @@ public class TestBlockReplacement extend
    * 
    * Return true if a block is successfully copied; otherwise false.
    */
-  private boolean replaceBlock( Block block, DatanodeInfo source,
+  private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
     Socket sock = new Socket();
     sock.connect(NetUtils.createSocketAddr(
@@ -232,13 +230,8 @@ public class TestBlockReplacement extend
     sock.setKeepAlive(true);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    REPLACE_BLOCK.write(out);
-    out.writeLong(block.getBlockId());
-    out.writeLong(block.getGenerationStamp());
-    Text.writeString(out, source.getStorageID());
-    sourceProxy.write(out);
-    BlockTokenSecretManager.DUMMY_TOKEN.write(out);
+    DataTransferProtocol.Sender.opReplaceBlock(out, block, source
+        .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Thu May  5 05:40:07 2011
@@ -28,12 +28,14 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -67,6 +69,7 @@ public class TestBlockReport {
   static final int BLOCK_SIZE = 1024;
   static final int NUM_BLOCKS = 10;
   static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
+  static String bpid;
 
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
@@ -85,6 +88,7 @@ public class TestBlockReport {
     REPL_FACTOR = 1; //Reset if case a test has modified the value
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
     fs = (DistributedFileSystem) cluster.getFileSystem();
+    bpid = cluster.getNamesystem().getBlockPoolId();
   }
 
   @After
@@ -130,8 +134,11 @@ public class TestBlockReport {
             b.getNumBytes());
       }
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N0).dnRegistration,
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     List<LocatedBlock> blocksAfterReport =
@@ -143,7 +150,7 @@ public class TestBlockReport {
     }
 
     for (int i = 0; i < blocksAfterReport.size(); i++) {
-      Block b = blocksAfterReport.get(i).getBlock();
+      ExtendedBlock b = blocksAfterReport.get(i).getBlock();
       assertEquals("Length of " + i + "th block is incorrect",
         oldLengths[i], b.getNumBytes());
     }
@@ -171,7 +178,7 @@ public class TestBlockReport {
     File dataDir = new File(cluster.getDataDirectory());
     assertTrue(dataDir.isDirectory());
 
-    List<Block> blocks2Remove = new ArrayList<Block>();
+    List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
     List<Integer> removedIndex = new ArrayList<Integer>();
     List<LocatedBlock> lBlocks = cluster.getNameNode().getBlockLocations(
       filePath.toString(), FILE_START,
@@ -192,7 +199,7 @@ public class TestBlockReport {
       LOG.debug("Number of blocks allocated " + lBlocks.size());
     }
 
-    for (Block b : blocks2Remove) {
+    for (ExtendedBlock b : blocks2Remove) {
       if(LOG.isDebugEnabled()) {
         LOG.debug("Removing the block " + b.getBlockName());
       }
@@ -206,8 +213,11 @@ public class TestBlockReport {
 
     waitTil(DN_RESCAN_EXTRA_WAIT);
 
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N0).dnRegistration,
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
       new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     cluster.getNamesystem().computeDatanodeWork();
@@ -241,9 +251,12 @@ public class TestBlockReport {
     blocks.get(0).setGenerationStamp(rand.nextLong());
     // This new block is unknown to NN and will be mark for deletion.
     blocks.add(new Block());
-    DatanodeCommand dnCmd =
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N0).dnRegistration,
+    
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId,
         new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     if(LOG.isDebugEnabled()) {
       LOG.debug("Got the command: " + dnCmd);
@@ -291,9 +304,12 @@ public class TestBlockReport {
     ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
     startDNandWait(filePath, true);
 
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N1);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of PendingReplication Blocks",
       0, cluster.getNamesystem().getUnderReplicatedBlocks());
@@ -327,8 +343,7 @@ public class TestBlockReport {
     int randIndex = rand.nextInt(blocks.size());
     // Get a block and screw its GS
     Block corruptedBlock = blocks.get(randIndex);
-    String secondNode = cluster.getDataNodes().get(DN_N1).
-      getDatanodeRegistration().getStorageID();
+    String secondNode = cluster.getDataNodes().get(DN_N1).getStorageId();
     if(LOG.isDebugEnabled()) {
       LOG.debug("Working with " + secondNode);
       LOG.debug("BlockGS before " + blocks.get(randIndex).getGenerationStamp());
@@ -338,9 +353,12 @@ public class TestBlockReport {
       LOG.debug("BlockGS after " + blocks.get(randIndex).getGenerationStamp());
       LOG.debug("Done corrupting GS of " + corruptedBlock.getBlockName());
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N1);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
     assertEquals("Wrong number of Corrupted blocks",
       1, cluster.getNamesystem().getCorruptReplicaBlocks() +
@@ -360,9 +378,9 @@ public class TestBlockReport {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
     }
-    cluster.getNameNode().blockReport(
-      cluster.getDataNodes().get(DN_N1).dnRegistration,
-      new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+    
+    cluster.getNameNode().blockReport(dnR, poolId,
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
     printStats();
 
     assertEquals("Wrong number of Corrupted blocks",
@@ -406,10 +424,13 @@ public class TestBlockReport {
       bc.start();
 
       waitForTempReplica(bl, DN_N1);
-
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N1).dnRegistration,
-        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+      
+      // all blocks belong to the same file, hence same BP
+      DataNode dn = cluster.getDataNodes().get(DN_N1);
+      String poolId = cluster.getNamesystem().getBlockPoolId();
+      DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+      cluster.getNameNode().blockReport(dnR, poolId,
+          new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
         blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@@ -450,9 +471,12 @@ public class TestBlockReport {
 
       waitForTempReplica(bl, DN_N1);
                                                 
-      cluster.getNameNode().blockReport(
-        cluster.getDataNodes().get(DN_N1).dnRegistration,
-        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+      // all blocks belong to the same file, hence same BP
+      DataNode dn = cluster.getDataNodes().get(DN_N1);
+      String poolId = cluster.getNamesystem().getBlockPoolId();
+      DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+      cluster.getNameNode().blockReport(dnR, poolId,
+          new BlockListAsLongs(blocks, null).getBlockListAsLongs());
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
         2, cluster.getNamesystem().getPendingReplicationBlocks());
@@ -465,7 +489,7 @@ public class TestBlockReport {
     }
   }
 
-  private void waitForTempReplica(Block bl, int DN_N1) {
+  private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
     final boolean tooLongWait = false;
     final int TIMEOUT = 40000;
     
@@ -478,16 +502,18 @@ public class TestBlockReport {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Total number of DNs " + cluster.getDataNodes().size());
     }
+    cluster.waitActive();
+    
     // Look about specified DN for the replica of the block from 1st DN
-    Replica r;
-    r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
-      fetchReplicaInfo(bl.getBlockId());
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
+      fetchReplicaInfo(bpid, bl.getBlockId());
     long start = System.currentTimeMillis();
     int count = 0;
     while (r == null) {
       waitTil(5);
       r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
-        fetchReplicaInfo(bl.getBlockId());
+        fetchReplicaInfo(bpid, bl.getBlockId());
       long waiting_period = System.currentTimeMillis() - start;
       if (count++ % 100 == 0)
         if(LOG.isDebugEnabled()) {
@@ -548,8 +574,8 @@ public class TestBlockReport {
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("New datanode "
-          + cluster.getDataNodes().get(datanodes.size() - 1)
-          .getDatanodeRegistration() + " has been started");
+          + cluster.getDataNodes().get(datanodes.size() - 1).getMachineName() 
+          + " has been started");
     }
     if (waitReplicas) DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR);
   }
@@ -593,7 +619,7 @@ public class TestBlockReport {
         }
         continue;
       }
-      newList.add(new Block(locatedBlks.get(i).getBlock()));
+      newList.add(new Block(locatedBlks.get(i).getBlock().getLocalBlock()));
     }
     return newList;
   }
@@ -685,7 +711,8 @@ public class TestBlockReport {
 
       // Get block from the first DN
       ret = cluster.getDataNodes().get(DN_N0).
-        data.getStoredBlock(lb.getBlock().getBlockId());
+        data.getStoredBlock(lb.getBlock()
+        .getBlockPoolId(), lb.getBlock().getBlockId());
     return ret;
   }
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java Thu May  5 05:40:07 2011
@@ -44,6 +44,9 @@ public class TestDataNodeMXBean {
 
       MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 
       ObjectName mxbeanName = new ObjectName("HadoopInfo:type=DataNodeInfo");
+      // get attribute "ClusterId"
+      String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
+      Assert.assertEquals(datanode.getClusterId(), clusterId);
       // get attribute "Version"
       String version = (String)mbs.getAttribute(mxbeanName, "Version");
       Assert.assertEquals(datanode.getVersion(),version);
@@ -53,10 +56,10 @@ public class TestDataNodeMXBean {
       // get attribute "HttpPort"
       String httpPort = (String)mbs.getAttribute(mxbeanName, "HttpPort");
       Assert.assertEquals(datanode.getHttpPort(),httpPort);
-      // get attribute "NamenodeAddress"
-      String namenodeAddress = (String)mbs.getAttribute(mxbeanName, 
-          "NamenodeAddress");
-      Assert.assertEquals(datanode.getNamenodeAddress(),namenodeAddress);
+      // get attribute "NamenodeAddresses"
+      String namenodeAddresses = (String)mbs.getAttribute(mxbeanName, 
+          "NamenodeAddresses");
+      Assert.assertEquals(datanode.getNamenodeAddresses(),namenodeAddresses);
       // get attribute "getVolumeInfo"
       String volumeInfo = (String)mbs.getAttribute(mxbeanName, "VolumeInfo");
       Assert.assertEquals(replaceDigits(datanode.getVolumeInfo()),

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Thu May  5 05:40:07 2011
@@ -33,12 +33,13 @@ import org.apache.hadoop.hdfs.BlockReade
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
 
@@ -119,7 +120,8 @@ public class TestDataNodeVolumeFailure {
     // fail the volume
     // delete/make non-writable one of the directories (failed volume)
     data_fail = new File(dataDir, "data3");
-    failedDir = new File(data_fail, MiniDFSCluster.FINALIZED_DIR_NAME);
+    failedDir = MiniDFSCluster.getFinalizedDir(dataDir, 
+        cluster.getNamesystem().getBlockPoolId());
     if (failedDir.exists() &&
         //!FileUtil.fullyDelete(failedDir)
         !deteteBlocks(failedDir)
@@ -137,8 +139,10 @@ public class TestDataNodeVolumeFailure {
     
     // make sure a block report is sent 
     DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
-    long[] bReport = dn.getFSDataset().getBlockReport().getBlockListAsLongs();
-    cluster.getNameNode().blockReport(dn.dnRegistration, bReport);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
+    long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
+    cluster.getNameNode().blockReport(dnR, bpid, bReport);
 
     // verify number of blocks and files...
     verify(filename, filesize);
@@ -216,7 +220,7 @@ public class TestDataNodeVolumeFailure {
     
     for (LocatedBlock lb : locatedBlocks) {
       DatanodeInfo dinfo = lb.getLocations()[1];
-      Block b = lb.getBlock();
+      ExtendedBlock b = lb.getBlock();
       try {
         accessBlock(dinfo, lb);
       } catch (IOException e) {
@@ -254,8 +258,7 @@ public class TestDataNodeVolumeFailure {
     throws IOException {
     InetSocketAddress targetAddr = null;
     Socket s = null;
-    BlockReader blockReader = null; 
-    Block block = lblock.getBlock(); 
+    ExtendedBlock block = lblock.getBlock(); 
    
     targetAddr = NetUtils.createSocketAddr(datanode.getName());
       
@@ -263,8 +266,10 @@ public class TestDataNodeVolumeFailure {
     s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-    String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-    blockReader = 
+    String file = BlockReader.getFileName(targetAddr, 
+        "test-blockpoolid",
+        block.getBlockId());
+    BlockReader blockReader = 
       BlockReader.newBlockReader(s, file, block, lblock
         .getBlockToken(), 0, -1, 4096);
 
@@ -314,9 +319,11 @@ public class TestDataNodeVolumeFailure {
    */
   private int countRealBlocks(Map<String, BlockLocs> map) {
     int total = 0;
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
     for(int i=0; i<dn_num; i++) {
-      for(int j=1; j<=2; j++) {
-        File dir = new File(dataDir, "data"+(2*i+j)+MiniDFSCluster.FINALIZED_DIR_NAME);
+      for(int j=0; j<=1; j++) {
+        File storageDir = MiniDFSCluster.getStorageDir(i, j);
+        File dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
         if(dir == null) {
           System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
           continue;

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java Thu May  5 05:40:07 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,9 +41,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import static org.junit.Assert.*;
+import static org.junit.Assume.assumeTrue;
 
 /**
- * Test successive volume failures, failure metrics and capacity reporting.
+ * Test reporting of DN volume failure counts and metrics.
  */
 public class TestDataNodeVolumeFailureReporting {
 
@@ -56,6 +58,14 @@ public class TestDataNodeVolumeFailureRe
   private Configuration conf;
   private String dataDir;
 
+  // Sleep at least 3 seconds (a 1s heartbeat plus padding) to allow
+  // for heartbeats to propagate from the datanodes to the namenode.
+  final int WAIT_FOR_HEARTBEATS = 3000;
+
+  // Wait at least (2 * re-check + 10 * heartbeat) seconds for
+  // a datanode to be considered dead by the namenode.  
+  final int WAIT_FOR_DEATH = 15000;
+
   @Before
   public void setUp() throws Exception {
     conf = new HdfsConfiguration();
@@ -77,6 +87,10 @@ public class TestDataNodeVolumeFailureRe
 
   @After
   public void tearDown() throws Exception {
+    for (int i = 0; i < 3; i++) {
+      new File(dataDir, "data"+(2*i+1)).setExecutable(true);
+      new File(dataDir, "data"+(2*i+2)).setExecutable(true);
+    }
     cluster.shutdown();
   }
 
@@ -87,41 +101,22 @@ public class TestDataNodeVolumeFailureRe
    */
   @Test
   public void testSuccessiveVolumeFailures() throws Exception {
-    if (System.getProperty("os.name").startsWith("Windows")) {
-      // See above
-      return;
-    }
+    assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
+
     // Bring up two more datanodes
     cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
 
     /*
-     * Sleep at least 3 seconds (a 1s heartbeat plus padding) to allow
-     * for heartbeats to propagate from the datanodes to the namenode.
-     * Sleep  at least (2 * re-check + 10 * heartbeat) 12 seconds for
-     * a datanode  to be called dead by the namenode.
-     */
-    final int WAIT_FOR_HEARTBEATS = 3000;
-    final int WAIT_FOR_DEATH = 15000;
-
-    /*
      * Calculate the total capacity of all the datanodes. Sleep for
      * three seconds to be sure the datanodes have had a chance to
      * heartbeat their capacities.
      */
     Thread.sleep(WAIT_FOR_HEARTBEATS);
-    FSNamesystem namesystem = cluster.getNamesystem();
-    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-    namesystem.DFSNodesStatus(live, dead);
-    assertEquals("All DNs should be live", 3, live.size());
-    assertEquals("All DNs should be live", 0, dead.size());
-    long origCapacity = 0;
-    for (final DatanodeDescriptor dn : live) {
-      origCapacity += dn.getCapacity();
-      assertEquals("DN "+dn+" vols should be healthy",
-          0, dn.getVolumeFailures());
-    }
+    FSNamesystem ns = cluster.getNamesystem();
+
+    long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(ns);
+    long dnCapacity = DFSTestUtil.getDatanodeCapacity(ns, 0);
 
     File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
     File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
@@ -147,9 +142,9 @@ public class TestDataNodeVolumeFailureRe
     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file1, (short)3);
     ArrayList<DataNode> dns = cluster.getDataNodes();
-    assertTrue("DN1 should be up", DataNode.isDatanodeUp(dns.get(0)));
-    assertTrue("DN2 should be up", DataNode.isDatanodeUp(dns.get(1)));
-    assertTrue("DN3 should be up", DataNode.isDatanodeUp(dns.get(2)));
+    assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
+    assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
+    assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
 
     /*
      * The metrics should confirm the volume failures.
@@ -158,27 +153,18 @@ public class TestDataNodeVolumeFailureRe
     DataNodeMetrics metrics2 = dns.get(1).getMetrics();
     DataNodeMetrics metrics3 = dns.get(2).getMetrics();
     assertEquals("Vol1 should report 1 failure",
-        1, metrics1.volumesFailed.getCurrentIntervalValue());
+        1, metrics1.volumeFailures.getCurrentIntervalValue());
     assertEquals("Vol2 should report 1 failure",
-        1, metrics2.volumesFailed.getCurrentIntervalValue());
+        1, metrics2.volumeFailures.getCurrentIntervalValue());
     assertEquals("Vol3 should have no failures",
-        0, metrics3.volumesFailed.getCurrentIntervalValue());
+        0, metrics3.volumeFailures.getCurrentIntervalValue());
 
-    // Eventually the NN should report two volume failures as well
-    while (true) {
-      Thread.sleep(WAIT_FOR_HEARTBEATS);
-      live.clear();
-      dead.clear();
-      namesystem.DFSNodesStatus(live, dead);
-      int volumeFailures = 0;
-      for (final DatanodeDescriptor dn : live) {
-        volumeFailures += dn.getVolumeFailures();
-      }
-      if (2 == volumeFailures) {
-        break;
-      }
-      LOG.warn("Still waiting for volume failures: "+volumeFailures);
-    }
+    // Ensure we wait a sufficient amount of time
+    assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
+
+    // Eventually the NN should report two volume failures
+    DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 2, 
+        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
 
     /*
      * Now fail a volume on the third datanode. We should be able to get
@@ -188,12 +174,16 @@ public class TestDataNodeVolumeFailureRe
     Path file2 = new Path("/test2");
     DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file2, (short)3);
-    assertTrue("DN3 should still be up", DataNode.isDatanodeUp(dns.get(2)));
+    assertTrue("DN3 should still be up", dns.get(2).isDatanodeUp());
     assertEquals("Vol3 should report 1 failure",
-        1, metrics3.volumesFailed.getCurrentIntervalValue());
+        1, metrics3.volumeFailures.getCurrentIntervalValue());
+
+    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+    ns.DFSNodesStatus(live, dead);
     live.clear();
     dead.clear();
-    namesystem.DFSNodesStatus(live, dead);
+    ns.DFSNodesStatus(live, dead);
     assertEquals("DN3 should have 1 failed volume",
         1, live.get(2).getVolumeFailures());
 
@@ -202,25 +192,9 @@ public class TestDataNodeVolumeFailureRe
      * total capacity should be down by three volumes (assuming the host
      * did not grow or shrink the data volume while the test was running).
      */
-    while (true) {
-      Thread.sleep(WAIT_FOR_HEARTBEATS);
-      live.clear();
-      dead.clear();
-      namesystem.DFSNodesStatus(live, dead);
-      long currCapacity = 0;
-      long singleVolCapacity = live.get(0).getCapacity();
-      for (final DatanodeDescriptor dn : live) {
-        currCapacity += dn.getCapacity();
-      }
-      LOG.info("Live: "+live.size()+" Dead: "+dead.size());
-      LOG.info("Original capacity: "+origCapacity);
-      LOG.info("Current capacity: "+currCapacity);
-      LOG.info("Volume capacity: "+singleVolCapacity);
-      if (3 == live.size() && 0 == dead.size() &&
-          origCapacity == (currCapacity + (3 * singleVolCapacity))) {
-        break;
-      }
-    }
+    dnCapacity = DFSTestUtil.getDatanodeCapacity(ns, 0);
+    DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 3, 
+        origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS);
 
     /*
      * Now fail the 2nd volume on the 3rd datanode. All its volumes
@@ -232,25 +206,18 @@ public class TestDataNodeVolumeFailureRe
     Path file3 = new Path("/test3");
     DFSTestUtil.createFile(fs, file3, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file3, (short)2);
-    // Eventually the DN should go down
-    while (DataNode.isDatanodeUp(dns.get(2))) {
-      Thread.sleep(1000);
-    }
-    // and report two failed volumes
+
+    // The DN should consider itself dead
+    DFSTestUtil.waitForDatanodeDeath(dns.get(2));
+
+    // And report two failed volumes
     metrics3 = dns.get(2).getMetrics();
     assertEquals("DN3 should report 2 vol failures",
-        2, metrics3.volumesFailed.getCurrentIntervalValue());
-    // and eventually be seen as dead by the NN.
-    while (true) {
-      Thread.sleep(WAIT_FOR_DEATH);
-      live.clear();
-      dead.clear();
-      namesystem.DFSNodesStatus(live, dead);
-      if (1 == dead.size() && 2 == live.size()) {
-        break;
-      }
-      LOG.warn("Still waiting for dn to die: "+dead.size());
-    }
+        2, metrics3.volumeFailures.getCurrentIntervalValue());
+
+    // The NN considers the DN dead
+    DFSTestUtil.waitForDatanodeStatus(ns, 2, 1, 2, 
+        origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS);
 
     /*
      * The datanode never tries to restore the failed volume, even if
@@ -273,105 +240,44 @@ public class TestDataNodeVolumeFailureRe
      * and that the volume failure count should be reported as zero by
      * both the metrics and the NN.
      */
-    while (true) {
-      Thread.sleep(WAIT_FOR_DEATH);
-      live.clear();
-      dead.clear();
-      namesystem.DFSNodesStatus(live, dead);
-      assertEquals("All DNs should be live", 3, live.size());
-      assertEquals("All DNs should be live", 0, dead.size());
-      long currCapacity = 0;
-      long volFailures = 0;
-      for (final DatanodeDescriptor dn : live) {
-        currCapacity += dn.getCapacity();
-        volFailures += dn.getVolumeFailures();
-      }
-      if (3 == live.size() && 0 == dead.size() && 0 == volFailures &&
-          origCapacity == currCapacity) {
-        break;
-      }
-      LOG.warn("Waiting for capacity: original="+origCapacity+" current="+
-          currCapacity+" live="+live.size()+" dead="+dead.size()+
-          " vols="+volFailures);
-    }
+    DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 0, origCapacity, 
+        WAIT_FOR_HEARTBEATS);
   }
 
   /**
-   * Test the DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY configuration
-   * option, ie the DN shuts itself down when the number of failures
-   * experienced drops below the tolerated amount.
+   * Test that the NN re-learns of volume failures after restart.
    */
   @Test
-  public void testConfigureMinValidVolumes() throws Exception {
-    if (System.getProperty("os.name").startsWith("Windows")) {
-      // See above
-      return;
-    }
+  public void testVolFailureStatsPreservedOnNNRestart() throws Exception {
+    assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
 
-    // Bring up two additional datanodes that need both of their volumes
-    // functioning in order to stay up.
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
+    // Bring up two more datanodes that can tolerate 1 failure
     cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
 
-    // Fail a volume on the 2nd DN
+    FSNamesystem ns = cluster.getNamesystem();
+    long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(ns);
+    long dnCapacity = DFSTestUtil.getDatanodeCapacity(ns, 0);
+
+    // Fail the first volume on both datanodes (we have to keep the 
+    // third healthy so one node in the pipeline will not fail). 
+    File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
     File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
+    assertTrue("Couldn't chmod local vol", dn1Vol1.setExecutable(false));
     assertTrue("Couldn't chmod local vol", dn2Vol1.setExecutable(false));
 
-    // Should only get two replicas (the first DN and the 3rd)
     Path file1 = new Path("/test1");
-    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
+    DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
     DFSTestUtil.waitReplication(fs, file1, (short)2);
 
-    // Check that this single failure caused a DN to die.
-    while (true) {
-      final int WAIT_FOR_DEATH = 15000;
-      Thread.sleep(WAIT_FOR_DEATH);
-      FSNamesystem namesystem = cluster.getNamesystem();
-      ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-      ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-      namesystem.DFSNodesStatus(live, dead);
-      if (1 == dead.size()) {
-        break;
-      }
-      LOG.warn("Waiting for datanode to die: "+dead.size());
-    }
-
-    // If we restore the volume we should still only be able to get
-    // two replicas since the DN is still considered dead.
-    assertTrue("Couldn't chmod local vol", dn2Vol1.setExecutable(true));
-    Path file2 = new Path("/test2");
-    DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
-    DFSTestUtil.waitReplication(fs, file2, (short)2);
-  }
+    // The NN reports two volumes failures
+    DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 2, 
+        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
 
-  /**
-   * Test invalid DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY values.
-   */
-  @Test
-  public void testInvalidFailedVolumesConfig() throws Exception {
-    if (System.getProperty("os.name").startsWith("Windows")) {
-      // See above
-      return;
-    }
-    /*
-     * Bring up another datanode that has an invalid value set.
-     * We should still be able to create a file with two replicas
-     * since the minimum valid volume parameter is only checked
-     * when we experience a disk error.
-     */
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, -1);
-    cluster.startDataNodes(conf, 1, true, null, null);
-    cluster.waitActive();
-    Path file1 = new Path("/test1");
-    DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
-    DFSTestUtil.waitReplication(fs, file1, (short)2);
-    // Ditto if the value is too big.
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 100);
-    cluster.startDataNodes(conf, 1, true, null, null);
+    // After restarting the NN it still see the two failures
+    cluster.restartNameNode(0);
     cluster.waitActive();
-    Path file2 = new Path("/test1");
-    DFSTestUtil.createFile(fs, file2, 1024, (short)2, 1L);
-    DFSTestUtil.waitReplication(fs, file2, (short)2);
+    DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 2,
+        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java Thu May  5 05:40:07 2011
@@ -98,7 +98,7 @@ public class TestDatanodeRestart {
       out.write(writeBuf);
       out.hflush();
       DataNode dn = cluster.getDataNodes().get(0);
-      for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
+      for (FSVolume volume : ((FSDataset)dn.data).volumes.getVolumes()) {
         File currentDir = volume.getDir().getParentFile();
         File rbwDir = new File(currentDir, "rbw");
         for (File file : rbwDir.listFiles()) {
@@ -112,16 +112,17 @@ public class TestDatanodeRestart {
       dn = cluster.getDataNodes().get(0);
 
       // check volumeMap: one rwr replica
+      String bpid = cluster.getNamesystem().getBlockPoolId();
       ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
-      Assert.assertEquals(1, replicas.size());
-      ReplicaInfo replica = replicas.replicas().iterator().next();
+      Assert.assertEquals(1, replicas.size(bpid));
+      ReplicaInfo replica = replicas.replicas(bpid).iterator().next();
       Assert.assertEquals(ReplicaState.RWR, replica.getState());
       if (isCorrupt) {
         Assert.assertEquals((fileLen-1)/512*512, replica.getNumBytes());
       } else {
         Assert.assertEquals(fileLen, replica.getNumBytes());
       }
-      dn.data.invalidate(new Block[]{replica});
+      dn.data.invalidate(bpid, new Block[]{replica});
     } finally {
       IOUtils.closeStream(out);
       if (fs.exists(src)) {
@@ -146,9 +147,10 @@ public class TestDatanodeRestart {
         DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L);
         DFSTestUtil.waitReplication(fs, fileName, (short)1);
       }
+      String bpid = cluster.getNamesystem().getBlockPoolId();
       DataNode dn = cluster.getDataNodes().get(0);
       Iterator<ReplicaInfo> replicasItor = 
-        ((FSDataset)dn.data).volumeMap.replicas().iterator();
+        ((FSDataset)dn.data).volumeMap.replicas(bpid).iterator();
       ReplicaInfo replica = replicasItor.next();
       createUnlinkTmpFile(replica, true, true); // rename block file
       createUnlinkTmpFile(replica, false, true); // rename meta file
@@ -165,7 +167,7 @@ public class TestDatanodeRestart {
 
       // check volumeMap: 4 finalized replica
       Collection<ReplicaInfo> replicas = 
-        ((FSDataset)(dn.data)).volumeMap.replicas();
+        ((FSDataset)(dn.data)).volumeMap.replicas(bpid);
       Assert.assertEquals(4, replicas.size());
       replicasItor = replicas.iterator();
       while (replicasItor.hasNext()) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Thu May  5 05:40:07 2011
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -48,6 +50,7 @@ public class TestDirectoryScanner extend
   private static final int DEFAULT_GEN_STAMP = 9999;
 
   private MiniDFSCluster cluster;
+  private String bpid;
   private FSDataset fds = null;
   private DirectoryScanner scanner = null;
   private Random rand = new Random();
@@ -69,7 +72,7 @@ public class TestDirectoryScanner extend
   /** Truncate a block file */
   private long truncateBlockFile() throws IOException {
     synchronized (fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
         // Truncate a block file that has a corresponding metadata file
@@ -88,7 +91,7 @@ public class TestDirectoryScanner extend
   /** Delete a block file */
   private long deleteBlockFile() {
     synchronized(fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
         // Delete a block file that has corresponding metadata file
@@ -104,7 +107,7 @@ public class TestDirectoryScanner extend
   /** Delete block meta file */
   private long deleteMetaFile() {
     synchronized(fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
         File file = b.getMetaFile();
         // Delete a metadata file
         if (file.exists() && file.delete()) {
@@ -121,7 +124,7 @@ public class TestDirectoryScanner extend
     long id = rand.nextLong();
     while (true) {
       id = rand.nextLong();
-      if (fds.fetchReplicaInfo(id) == null) {
+      if (fds.fetchReplicaInfo(bpid, id) == null) {
         break;
       }
     }
@@ -139,10 +142,11 @@ public class TestDirectoryScanner extend
 
   /** Create a block file in a random volume*/
   private long createBlockFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getBlockFile(id));
     if (file.createNewFile()) {
       LOG.info("Created block file " + file.getName());
     }
@@ -151,10 +155,11 @@ public class TestDirectoryScanner extend
 
   /** Create a metafile in a random volume*/
   private long createMetaFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getMetaFile(id));
     if (file.createNewFile()) {
       LOG.info("Created metafile " + file.getName());
     }
@@ -163,10 +168,11 @@ public class TestDirectoryScanner extend
 
   /** Create block file and corresponding metafile in a rondom volume */
   private long createBlockMetaFile() throws IOException {
-    FSVolume[] volumes = fds.volumes.volumes;
-    int index = rand.nextInt(volumes.length - 1);
+    List<FSVolume> volumes = fds.volumes.getVolumes();
+    int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+    File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getBlockFile(id));
     if (file.createNewFile()) {
       LOG.info("Created block file " + file.getName());
 
@@ -185,7 +191,7 @@ public class TestDirectoryScanner extend
         LOG.info("Created extraneous file " + name2);
       }
 
-      file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+      file = new File(finalizedDir, getMetaFile(id));
       if (file.createNewFile()) {
         LOG.info("Created metafile " + file.getName());
       }
@@ -196,12 +202,18 @@ public class TestDirectoryScanner extend
   private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
       long missingMemoryBlocks, long mismatchBlocks) {
     scanner.reconcile();
-    assertEquals(totalBlocks, scanner.totalBlocks);
-    assertEquals(diffsize, scanner.diff.size());
-    assertEquals(missingMetaFile, scanner.missingMetaFile);
-    assertEquals(missingBlockFile, scanner.missingBlockFile);
-    assertEquals(missingMemoryBlocks, scanner.missingMemoryBlocks);
-    assertEquals(mismatchBlocks, scanner.mismatchBlocks);
+    
+    assertTrue(scanner.diffs.containsKey(bpid));
+    LinkedList<DirectoryScanner.ScanInfo> diff = scanner.diffs.get(bpid);
+    assertTrue(scanner.stats.containsKey(bpid));
+    DirectoryScanner.Stats stats = scanner.stats.get(bpid);
+    
+    assertEquals(diffsize, diff.size());
+    assertEquals(totalBlocks, stats.totalBlocks);
+    assertEquals(missingMetaFile, stats.missingMetaFile);
+    assertEquals(missingBlockFile, stats.missingBlockFile);
+    assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
+    assertEquals(mismatchBlocks, stats.mismatchBlocks);
   }
 
   public void testDirectoryScanner() throws Exception {
@@ -215,10 +227,12 @@ public class TestDirectoryScanner extend
     cluster = new MiniDFSCluster.Builder(CONF).build();
     try {
       cluster.waitActive();
+      bpid = cluster.getNamesystem().getBlockPoolId();
       fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
       scanner = new DirectoryScanner(fds, CONF);
+      scanner.setRetainDiffs(true);
 
       // Add files with 100 blocks
       createFile("/tmp/t1", 10000);
@@ -318,19 +332,26 @@ public class TestDirectoryScanner extend
       truncateBlockFile();
       scan(totalBlocks+3, 6, 2, 2, 3, 2);
       scan(totalBlocks+1, 0, 0, 0, 0, 0);
+      
+      // Test14: validate clean shutdown of DirectoryScanner
+      ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
+      scanner.shutdown();
+      assertFalse(scanner.getRunStatus());
+      
     } finally {
+      scanner.shutdown();
       cluster.shutdown();
     }
   }
 
   private void verifyAddition(long blockId, long genStamp, long size) {
     final ReplicaInfo replicainfo;
-    replicainfo = fds.fetchReplicaInfo(blockId);
+    replicainfo = fds.fetchReplicaInfo(bpid, blockId);
     assertNotNull(replicainfo);
 
     // Added block has the same file as the one created by the test
     File file = new File(getBlockFile(blockId));
-    assertEquals(file.getName(), fds.findBlockFile(blockId).getName());
+    assertEquals(file.getName(), fds.findBlockFile(bpid, blockId).getName());
 
     // Generation stamp is same as that of created file
     assertEquals(genStamp, replicainfo.getGenerationStamp());
@@ -341,12 +362,12 @@ public class TestDirectoryScanner extend
 
   private void verifyDeletion(long blockId) {
     // Ensure block does not exist in memory
-    assertNull(fds.fetchReplicaInfo(blockId));
+    assertNull(fds.fetchReplicaInfo(bpid, blockId));
   }
 
   private void verifyGenStamp(long blockId, long genStamp) {
     final ReplicaInfo memBlock;
-    memBlock = fds.fetchReplicaInfo(blockId);
+    memBlock = fds.fetchReplicaInfo(bpid, blockId);
     assertNotNull(memBlock);
     assertEquals(genStamp, memBlock.getGenerationStamp());
   }

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Thu May  5 05:40:07 2011
@@ -51,7 +51,6 @@ public class TestDiskError {
   private FileSystem fs;
   private MiniDFSCluster cluster;
   private Configuration conf;
-  private String dataDir;
 
   @Before
   public void setUp() throws Exception {
@@ -60,7 +59,6 @@ public class TestDiskError {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
-    dataDir = cluster.getDataDirectory();
   }
 
   @After
@@ -86,8 +84,11 @@ public class TestDiskError {
     cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
     final int dnIndex = 0;
-    File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "current/rbw");
-    File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "current/rbw");
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    File storageDir = MiniDFSCluster.getStorageDir(dnIndex, 0);
+    File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
+    storageDir = MiniDFSCluster.getStorageDir(dnIndex, 1);
+    File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
     try {
       // make the data directory of the first datanode to be readonly
       assertTrue("Couldn't chmod local vol", dir1.setReadOnly());
@@ -95,7 +96,7 @@ public class TestDiskError {
 
       // create files and make sure that first datanode will be down
       DataNode dn = cluster.getDataNodes().get(dnIndex);
-      for (int i=0; DataNode.isDatanodeUp(dn); i++) {
+      for (int i=0; dn.isDatanodeUp(); i++) {
         Path fileName = new Path("/test.txt"+i);
         DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
         DFSTestUtil.waitReplication(fs, fileName, (short)2);
@@ -152,9 +153,11 @@ public class TestDiskError {
     out.close();
 
     // the temporary block & meta files should be deleted
-    String dataDir = cluster.getDataDirectory();
-    File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "current/rbw");
-    File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "current/rbw");
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    File storageDir = MiniDFSCluster.getStorageDir(sndNode, 0);
+    File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
+    storageDir = MiniDFSCluster.getStorageDir(sndNode, 1);
+    File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
     while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
       Thread.sleep(100);
     }



Mime
View raw message