hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1076447 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Wed, 02 Mar 2011 22:59:32 GMT
Author: suresh
Date: Wed Mar  2 22:59:31 2011
New Revision: 1076447

URL: http://svn.apache.org/viewvc?rev=1076447&view=rev
Log:
HDFS-1694. Federation: SimulatedFSDataset changes to work with federation and multiple block
pools. Contributed by Suresh Srinivas.


Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1076447&r1=1076446&r2=1076447&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Wed Mar  2 22:59:31 2011
@@ -144,7 +144,10 @@ Trunk (unreleased changes)
     HDFS-1701. Federation: Fix TestHeartbeathandling.
     (Erik Steffl and Tanping Wang via suresh)
 
-    HDFS-1693. Fix TestDFSStorageStateRecovery failure. (suresh)
+    HDFS-1693. Federation: Fix TestDFSStorageStateRecovery failure. (suresh)
+
+    HDFS-1694. Federation: SimulatedFSDataset changes to work with
+    federation and multiple block pools. (suresh)
 
   IMPROVEMENTS
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1076447&r1=1076446&r2=1076447&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Wed Mar  2 22:59:31 2011
@@ -744,8 +744,6 @@ public class DataNode extends Configured
         bpRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
         bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
         bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
-        // TODO: FEDERATION 
-        //bpRegistration.storageInfo.blockpoolID = bpNSInfo.blockpoolID;
       } else {
         // read storage info, lock data dirs and transition fs state if necessary       
  
         storage.recoverTransitionRead(blockPoolId, bpNSInfo, dataDirs, startOpt);
@@ -1078,20 +1076,12 @@ public class DataNode extends Configured
         }
       }
 
-      
-      // TODO:FEDERATION - reavaluate the following two checks!!!!!
-      assert ("".equals(storage.getStorageID())
-          && !"".equals(bpRegistration.getStorageID()))
-          || storage.getStorageID().equals(bpRegistration.getStorageID()) :
-      "New storageID can be assigned only if data-node is not formatted";
-
       if (storage.getStorageID().equals("")) {
         storage.setStorageID(bpRegistration.getStorageID());
         storage.writeAll();
         LOG.info("New storage id " + bpRegistration.getStorageID()
             + " is assigned to data-node " + bpRegistration.getName());
-      }
-      if(! storage.getStorageID().equals(bpRegistration.getStorageID())) {
+      } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
         throw new IOException("Inconsistent storage IDs. Name-node returned "
             + bpRegistration.getStorageID() 
             + ". Expecting " + storage.getStorageID());
@@ -1334,15 +1324,11 @@ public class DataNode extends Configured
 
     if (simulatedFSDataset) {
       setNewStorageID(datanodeId);
-      conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY,
-          datanodeId.getStorageID());
-
       // it would have been better to pass storage as a parameter to
       // constructor below - need to augment ReflectionUtils used below.
-
+      conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, datanodeId
+          .getStorageID());
       try {
-        // TODO:FEDERATION Equivalent of following (can't do because Simulated
-        // is in test dir)
         data = (FSDatasetInterface) ReflectionUtils.newInstance(
             Class.forName(
             "org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
@@ -1350,8 +1336,6 @@ public class DataNode extends Configured
       } catch (ClassNotFoundException e) {
         throw new IOException(StringUtils.stringifyException(e));
       }
-      // TODO:FEDERATION do we need set it to the general dnRegistration?????
-      // TODO:FEDERATION do we need LV,NSid, cid,bpid for datanode version file?
     } else {
       data = new FSDataset(storage, conf);
     }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1076447&r1=1076446&r2=1076447&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Wed Mar  2 22:59:31 2011
@@ -76,7 +76,6 @@ import org.apache.hadoop.io.IOUtils;
 @InterfaceAudience.Private
 public class FSDataset implements FSConstants, FSDatasetInterface {
 
-
   /**
    * A node type that can be built into a tree reflecting the
    * hierarchy of blocks on the local disk.
@@ -624,8 +623,7 @@ public class FSDataset implements FSCons
     BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
       BlockPoolSlice bp = map.get(bpid);
       if (bp == null) {
-        // TODO:FEDERATION cleanup this exception
-        throw new IOException("block pool " + bpid + " not found");
+        throw new IOException("block pool " + bpid + " is not found");
       }
       return bp;
     }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=1076447&r1=1076446&r2=1076447&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
Wed Mar  2 22:59:31 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1076447&r1=1076446&r2=1076447&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Wed Mar  2 22:59:31 2011
@@ -38,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -93,13 +95,14 @@ public class SimulatedFSDataset  impleme
     SimulatedOutputStream oStream = null;
     private long bytesAcked;
     private long bytesRcvd;
-    BInfo(Block b, boolean forWriting) throws IOException {
+    BInfo(String bpid, Block b, boolean forWriting) throws IOException {
       theBlock = new Block(b);
       if (theBlock.getNumBytes() < 0) {
         theBlock.setNumBytes(0);
       }
-      if (!storage.alloc(theBlock.getNumBytes())) { // expected length - actual length may
-                                          // be more - we find out at finalize
+      if (!storage.alloc(bpid, theBlock.getNumBytes())) { 
+        // expected length - actual length may
+        // be more - we find out at finalize
         DataNode.LOG.warn("Lack of free storage on a block alloc");
         throw new IOException("Creating block, no free space available");
       }
@@ -142,7 +145,8 @@ public class SimulatedFSDataset  impleme
       }
     }
     
-    synchronized void finalizeBlock(long finalSize) throws IOException {
+    synchronized void finalizeBlock(String bpid, long finalSize)
+        throws IOException {
       if (finalized) {
         throw new IOException(
             "Finalizing a block that has already been finalized" + 
@@ -163,12 +167,12 @@ public class SimulatedFSDataset  impleme
       // adjust if necessary
       long extraLen = finalSize - theBlock.getNumBytes();
       if (extraLen > 0) {
-        if (!storage.alloc(extraLen)) {
+        if (!storage.alloc(bpid,extraLen)) {
           DataNode.LOG.warn("Lack of free storage on a block alloc");
           throw new IOException("Creating block, no free space available");
         }
       } else {
-        storage.free(-extraLen);
+        storage.free(bpid, -extraLen);
       }
       theBlock.setNumBytes(finalSize);  
 
@@ -261,12 +265,41 @@ public class SimulatedFSDataset  impleme
     }
   }
   
-  static private class SimulatedStorage {
-    private long capacity;  // in bytes
+  /**
+   * Class is used for tracking block pool storage utilization similar
+   * to {@link BlockPoolSlice}
+   */
+  private static class SimulatedBPStorage {
     private long used;    // in bytes
     
+    long getUsed() {
+      return used;
+    }
+    
+    void alloc(long amount) {
+      used += amount;
+    }
+    
+    void free(long amount) {
+      used -= amount;
+    }
+    
+    SimulatedBPStorage() {
+      used = 0;   
+    }
+  }
+  
+  /**
+   * Class used for tracking datanode level storage utilization similar
+   * to {@link FSVolumeSet}
+   */
+  private static class SimulatedStorage {
+    private Map<String, SimulatedBPStorage> map = 
+      new HashMap<String, SimulatedBPStorage>();
+    private long capacity;  // in bytes
+    
     synchronized long getFree() {
-      return capacity - used;
+      return capacity - getUsed();
     }
     
     synchronized long getCapacity() {
@@ -274,25 +307,47 @@ public class SimulatedFSDataset  impleme
     }
     
     synchronized long getUsed() {
+      long used = 0;
+      for (SimulatedBPStorage bpStorage : map.values()) {
+        used += bpStorage.getUsed();
+      }
       return used;
     }
     
-    synchronized boolean alloc(long amount) {
+    synchronized long getBlockPoolUsed(String bpid) throws IOException {
+      return getBPStorage(bpid).getUsed();
+    }
+    
+    synchronized boolean alloc(String bpid, long amount) throws IOException {
       if (getFree() >= amount) {
-        used += amount;
+        getBPStorage(bpid).alloc(amount);
         return true;
-      } else {
-        return false;    
       }
+      return false;    
     }
     
-    synchronized void free(long amount) {
-      used -= amount;
+    synchronized void free(String bpid, long amount) throws IOException {
+      getBPStorage(bpid).free(amount);
     }
     
     SimulatedStorage(long cap) {
       capacity = cap;
-      used = 0;   
+    }
+    
+    synchronized void addBlockPool(String bpid) {
+      SimulatedBPStorage bpStorage = map.get(bpid);
+      if (bpStorage != null) {
+        return;
+      }
+      map.put(bpid, new SimulatedBPStorage());
+    }
+    
+    private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
+      SimulatedBPStorage bpStorage = map.get(bpid);
+      if (bpStorage == null) {
+        throw new IOException("block pool " + bpid + " not found");
+      }
+      return bpStorage;
     }
   }
   
@@ -304,7 +359,9 @@ public class SimulatedFSDataset  impleme
     setConf(conf);
   }
   
-  private SimulatedFSDataset() { // real construction when setConf called.. Uggg
+  // Constructor used for constructing the object using reflection
+  @SuppressWarnings("unused")
+  private SimulatedFSDataset() { // real construction when setConf called..
   }
   
   public Configuration getConf() {
@@ -318,9 +375,6 @@ public class SimulatedFSDataset  impleme
     registerMBean(storageId);
     storage = new SimulatedStorage(
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
-    //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() + 
-    //    "Used = " + getDfsUsed() + "Free =" + getRemaining());
-
     blockMap = new HashMap<String, Map<Block,BInfo>>(); 
   }
 
@@ -346,7 +400,7 @@ public class SimulatedFSDataset  impleme
       }
       
       for (Block b: injectBlocks) {
-        BInfo binfo = new BInfo(b, false);
+        BInfo binfo = new BInfo(bpid, b, false);
         map.put(binfo.theBlock, binfo);
       }
     }
@@ -368,7 +422,7 @@ public class SimulatedFSDataset  impleme
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
-    binfo.finalizeBlock(b.getNumBytes());
+    binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes());
   }
 
   @Override // FSDatasetInterface
@@ -411,8 +465,7 @@ public class SimulatedFSDataset  impleme
 
   @Override // FSDatasetMBean
   public long getBlockPoolUsed(String bpid) throws IOException {
-    // TODO:FEDERATION currently a single block pool is supported
-    return storage.getUsed();
+    return storage.getBlockPoolUsed(bpid);
   }
   
   @Override // FSDatasetMBean
@@ -471,7 +524,7 @@ public class SimulatedFSDataset  impleme
         DataNode.LOG.warn("Invalidate: Missing block");
         continue;
       }
-      storage.free(binfo.getNumBytes());
+      storage.free(bpid, binfo.getNumBytes());
       blockMap.remove(b);
     }
     if (error) {
@@ -550,7 +603,7 @@ public class SimulatedFSDataset  impleme
           + " is not valid, and cannot be appended to.");
     }
     if (!binfo.isFinalized()) {
-      binfo.finalizeBlock(binfo.getNumBytes());
+      binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes());
     }
     map.remove(b.getLocalBlock());
     binfo.theBlock.setGenerationStamp(newGS);
@@ -594,7 +647,7 @@ public class SimulatedFSDataset  impleme
             " is being written, and cannot be written to.");
     }
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
-    BInfo binfo = new BInfo(b.getLocalBlock(), true);
+    BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
     map.put(binfo.theBlock, binfo);
     return binfo;
   }
@@ -885,5 +938,6 @@ public class SimulatedFSDataset  impleme
   public void addBlockPool(String bpid, Configuration conf) {
     Map<Block, BInfo> map = new HashMap<Block, BInfo>();
     blockMap.put(bpid, map);
+    storage.addBlockPool(bpid);
   }
 }



Mime
View raw message