hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1097905 [7/14] - in /hadoop/hdfs/trunk: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/ja...
Date Fri, 29 Apr 2011 18:16:38 GMT
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java Fri Apr 29 18:16:32 2011
@@ -19,24 +19,60 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
 
+/**
+ * Maintains the replicas map. 
+ */
 class ReplicasMap {
-  // HashMap: maps a block id to the replica's meta info
-  private HashMap<Long, ReplicaInfo> map = new HashMap<Long, ReplicaInfo>();
+  // Object using which this class is synchronized
+  private final Object mutex;
+  
+  // Map of block pool Id to another map of block Id to ReplicaInfo.
+  private Map<String, Map<Long, ReplicaInfo>> map = 
+    new HashMap<String, Map<Long, ReplicaInfo>>();
+  
+  ReplicasMap(Object mutex) {
+    if (mutex == null) {
+      throw new HadoopIllegalArgumentException(
+          "Object to synchronize on cannot be null");
+    }
+    this.mutex = mutex;
+  }
+  
+  String[] getBlockPoolList() {
+    synchronized(mutex) {
+      return map.keySet().toArray(new String[map.keySet().size()]);   
+    }
+  }
+  
+  private void checkBlockPool(String bpid) {
+    if (bpid == null) {
+      throw new IllegalArgumentException("Block Pool Id is null");
+    }
+  }
+  
+  private void checkBlock(Block b) {
+    if (b == null) {
+      throw new IllegalArgumentException("Block is null");
+    }
+  }
+  
   /**
    * Get the meta information of the replica that matches both block id 
    * and generation stamp
+   * @param bpid block pool id
    * @param block block with its id as the key
    * @return the replica's meta information
-   * @throws IllegalArgumentException if the input block is null
+   * @throws IllegalArgumentException if the input block or block pool is null
    */
-  ReplicaInfo get(Block block) {
-    if (block == null) {
-      throw new IllegalArgumentException("Do not expect null block");
-    }
-    ReplicaInfo replicaInfo = get(block.getBlockId());
+  ReplicaInfo get(String bpid, Block block) {
+    checkBlockPool(bpid);
+    checkBlock(block);
+    ReplicaInfo replicaInfo = get(bpid, block.getBlockId());
     if (replicaInfo != null && 
         block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
       return replicaInfo;
@@ -44,72 +80,139 @@ class ReplicasMap {
     return null;
   }
   
+  
   /**
    * Get the meta information of the replica that matches the block id
+   * @param bpid block pool id
    * @param blockId a block's id
    * @return the replica's meta information
    */
-  ReplicaInfo get(long blockId) {
-    return map.get(blockId);
+  ReplicaInfo get(String bpid, long blockId) {
+    checkBlockPool(bpid);
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      return m != null ? m.get(blockId) : null;
+    }
   }
   
   /**
    * Add a replica's meta information into the map 
    * 
+   * @param bpid block pool id
    * @param replicaInfo a replica's meta information
    * @return previous meta information of the replica
    * @throws IllegalArgumentException if the input parameter is null
    */
-  ReplicaInfo add(ReplicaInfo replicaInfo) {
-    if (replicaInfo == null) {
-      throw new IllegalArgumentException("Do not expect null block");
+  ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
+    checkBlockPool(bpid);
+    checkBlock(replicaInfo);
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      if (m == null) {
+        // Add an entry for block pool if it does not exist already
+        m = new HashMap<Long, ReplicaInfo>();
+        map.put(bpid, m);
+      }
+      return  m.put(replicaInfo.getBlockId(), replicaInfo);
     }
-    return  map.put(replicaInfo.getBlockId(), replicaInfo);
   }
   
   /**
    * Remove the replica's meta information from the map that matches
    * the input block's id and generation stamp
+   * @param bpid block pool id
    * @param block block with its id as the key
    * @return the removed replica's meta information
    * @throws IllegalArgumentException if the input block is null
    */
-  ReplicaInfo remove(Block block) {
-    if (block == null) {
-      throw new IllegalArgumentException("Do not expect null block");
-    }
-    Long key = Long.valueOf(block.getBlockId());
-    ReplicaInfo replicaInfo = map.get(key);
-    if (replicaInfo != null &&
-        block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
-      return remove(key);
-    } 
+  ReplicaInfo remove(String bpid, Block block) {
+    checkBlockPool(bpid);
+    checkBlock(block);
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      if (m != null) {
+        Long key = Long.valueOf(block.getBlockId());
+        ReplicaInfo replicaInfo = m.get(key);
+        if (replicaInfo != null &&
+            block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+          return m.remove(key);
+        } 
+      }
+    }
     
     return null;
   }
   
   /**
    * Remove the replica's meta information from the map if present
+   * @param bpid block pool id
    * @param the block id of the replica to be removed
    * @return the removed replica's meta information
    */
-  ReplicaInfo remove(long blockId) {
-    return map.remove(blockId);
+  ReplicaInfo remove(String bpid, long blockId) {
+    checkBlockPool(bpid);
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      if (m != null) {
+        return m.remove(blockId);
+      }
+    }
+    return null;
   }
  
   /**
-   * Get the size of the map
+   * Get the size of the map for given block pool
+   * @param bpid block pool id
    * @return the number of replicas in the map
    */
-  int size() {
-    return map.size();
+  int size(String bpid) {
+    Map<Long, ReplicaInfo> m = null;
+    synchronized(mutex) {
+      m = map.get(bpid);
+      return m != null ? m.size() : 0;
+    }
+  }
+  
+  /**
+   * Get a collection of the replicas for given block pool
+   * This method is <b>not synchronized</b>. It needs to be synchronized
+   * externally using the mutex, both for getting the replicas
+   * values from the map and iterating over it. Mutex can be accessed using
+   * {@link #getMutext()} method.
+   * 
+   * @param bpid block pool id
+   * @return a collection of the replicas belonging to the block pool
+   */
+  Collection<ReplicaInfo> replicas(String bpid) {
+    Map<Long, ReplicaInfo> m = null;
+    m = map.get(bpid);
+    return m != null ? m.values() : null;
+  }
+
+  void initBlockPool(String bpid) {
+    checkBlockPool(bpid);
+    synchronized(mutex) {
+      Map<Long, ReplicaInfo> m = map.get(bpid);
+      if (m == null) {
+        // Add an entry for block pool if it does not exist already
+        m = new HashMap<Long, ReplicaInfo>();
+        map.put(bpid, m);
+      }
+    }
+  }
+  
+  void cleanUpBlockPool(String bpid) {
+    checkBlockPool(bpid);
+    synchronized(mutex) {
+      map.remove(bpid);
+    }
   }
   
   /**
-   * Get a collection of the replicas
-   * @return a collection of the replicas
+   * Give access to mutex used for synchronizing ReplicasMap
+   * @return object used as lock
    */
-  Collection<ReplicaInfo> replicas() {
-    return map.values();
+  Object getMutext() {
+    return mutex;
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java Fri Apr 29 18:16:32 2011
@@ -18,31 +18,33 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
 
-  int curVolume = 0;
+  private int curVolume = 0;
 
   @Override
-  public synchronized FSVolume chooseVolume(FSVolume[] volumes, long blockSize)
+  public synchronized FSVolume chooseVolume(List<FSVolume> volumes, long blockSize)
       throws IOException {
-    if(volumes.length < 1) {
+    if(volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
     
     // since volumes could've been removed because of the failure
     // make sure we are not out of bounds
-    if(curVolume >= volumes.length) {
+    if(curVolume >= volumes.size()) {
       curVolume = 0;
     }
     
     int startVolume = curVolume;
     
     while (true) {
-      FSVolume volume = volumes[curVolume];
-      curVolume = (curVolume + 1) % volumes.length;
+      FSVolume volume = volumes.get(curVolume);
+      curVolume = (curVolume + 1) % volumes.size();
       if (volume.getAvailable() > blockSize) { return volume; }
       if (curVolume == startVolume) {
         throw new DiskOutOfSpaceException("Insufficient space for an additional block");

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java Fri Apr 29 18:16:32 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.util.Daemon;
@@ -38,10 +39,12 @@ import org.apache.hadoop.util.Daemon;
 class UpgradeManagerDatanode extends UpgradeManager {
   DataNode dataNode = null;
   Daemon upgradeDaemon = null;
+  String bpid = null;
 
-  UpgradeManagerDatanode(DataNode dataNode) {
+  UpgradeManagerDatanode(DataNode dataNode, String bpid) {
     super();
     this.dataNode = dataNode;
+    this.bpid = bpid;
   }
 
   public HdfsConstants.NodeType getType() {
@@ -52,11 +55,11 @@ class UpgradeManagerDatanode extends Upg
     if( ! super.initializeUpgrade())
       return; // distr upgrade is not needed
     DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
-        + dataNode.dnRegistration.getName() 
+        + dataNode.getMachineName() 
         + " version " + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " is initialized.");
     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
-    curUO.setDatanode(dataNode);
+    curUO.setDatanode(dataNode, this.bpid);
     upgradeState = curUO.preUpgradeAction(nsInfo);
     // upgradeState is true if the data-node should start the upgrade itself
   }
@@ -89,7 +92,8 @@ class UpgradeManagerDatanode extends Upg
           "UpgradeManagerDatanode.currentUpgrades is not null.";
         assert upgradeDaemon == null : 
           "UpgradeManagerDatanode.upgradeDaemon is not null.";
-        dataNode.namenode.processUpgradeCommand(broadcastCommand);
+        DatanodeProtocol nn = dataNode.getBPNamenode(bpid);
+        nn.processUpgradeCommand(broadcastCommand);
         return true;
       }
     }
@@ -104,12 +108,12 @@ class UpgradeManagerDatanode extends Upg
     }
     upgradeState = true;
     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
-    curUO.setDatanode(dataNode);
+    curUO.setDatanode(dataNode, this.bpid);
     curUO.startUpgrade();
     upgradeDaemon = new Daemon(curUO);
     upgradeDaemon.start();
     DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
-        + dataNode.dnRegistration.getName() 
+        + dataNode.getMachineName() 
         + " version " + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " is started.");
     return true;
@@ -124,7 +128,7 @@ class UpgradeManagerDatanode extends Upg
     if(startUpgrade()) // upgrade started
       return;
     throw new IOException(
-        "Distributed upgrade for DataNode " + dataNode.dnRegistration.getName() 
+        "Distributed upgrade for DataNode " + dataNode.getMachineName() 
         + " version " + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " cannot be started. "
         + "The upgrade object is not defined.");
@@ -139,7 +143,7 @@ class UpgradeManagerDatanode extends Upg
     currentUpgrades = null;
     upgradeDaemon = null;
     DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
-        + dataNode.dnRegistration.getName() 
+        + dataNode.getMachineName()
         + " version " + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " is complete.");
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java Fri Apr 29 18:16:32 2011
@@ -35,6 +35,7 @@ import java.net.SocketTimeoutException;
 @InterfaceAudience.Private
 public abstract class UpgradeObjectDatanode extends UpgradeObject implements Runnable {
   private DataNode dataNode = null;
+  private String bpid = null;
 
   public HdfsConstants.NodeType getType() {
     return HdfsConstants.NodeType.DATA_NODE;
@@ -43,9 +44,14 @@ public abstract class UpgradeObjectDatan
   protected DataNode getDatanode() {
     return dataNode;
   }
+  
+  protected DatanodeProtocol getNamenode() throws IOException {
+    return dataNode.getBPNamenode(bpid);
+  }
 
-  void setDatanode(DataNode dataNode) {
+  void setDatanode(DataNode dataNode, String bpid) {
     this.dataNode = dataNode;
+    this.bpid = bpid;
   }
 
   /**
@@ -86,12 +92,14 @@ public abstract class UpgradeObjectDatan
             + "\n   " + getDescription() + "."
             + " Name-node version = " + nsInfo.getLayoutVersion() + ".";
     DataNode.LOG.fatal( errorMsg );
+    String bpid = nsInfo.getBlockPoolID();
+    DatanodeProtocol nn = dataNode.getBPNamenode(bpid);
     try {
-      dataNode.namenode.errorReport(dataNode.dnRegistration,
+      nn.errorReport(dataNode.getDNRegistrationForBP(bpid),
                                     DatanodeProtocol.NOTIFY, errorMsg);
     } catch(SocketTimeoutException e) {  // namenode is busy
       DataNode.LOG.info("Problem connecting to server: " 
-                        + dataNode.getNameNodeAddr());
+                        + dataNode.getNameNodeAddr(nsInfo.getBlockPoolID()));
     }
     throw new IOException(errorMsg);
   }
@@ -116,7 +124,10 @@ public abstract class UpgradeObjectDatan
 
     // Complete the upgrade by calling the manager method
     try {
-      dataNode.upgradeManager.completeUpgrade();
+      UpgradeManagerDatanode upgradeManager = 
+        DataNode.getUpgradeManagerDatanode(bpid);
+      if(upgradeManager != null)
+        upgradeManager.completeUpgrade();
     } catch(IOException e) {
       DataNode.LOG.error(StringUtils.stringifyException(e));
     }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Fri Apr 29 18:16:32 2011
@@ -40,6 +40,13 @@ import org.apache.hadoop.classification.
 public interface FSDatasetMBean {
   
   /**
+   * Returns the total space (in bytes) used by a block pool
+   * @return  the total space used by a block pool
+   * @throws IOException
+   */  
+  public long getBlockPoolUsed(String bpid) throws IOException;
+  
+  /**
    * Returns the total space (in bytes) used by dfs datanode
    * @return  the total space used by dfs datanode
    * @throws IOException

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Fri Apr 29 18:16:32 2011
@@ -68,7 +68,11 @@ public class BackupNode extends NameNode
   String nnHttpAddress;
   /** Checkpoint manager */
   Checkpointer checkpointManager;
-
+  /** ClusterID to which BackupNode belongs to */
+  String clusterId;
+  /** Block pool Id of the peer namenode of this BackupNode */
+  String blockPoolId;
+  
   BackupNode(Configuration conf, NamenodeRole role) throws IOException {
     super(conf, role);
   }
@@ -133,6 +137,10 @@ public class BackupNode extends NameNode
     // therefore lease hard limit should never expire.
     namesystem.leaseManager.setLeasePeriod(
         FSConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE);
+    
+    clusterId = nsInfo.getClusterID();
+    blockPoolId = nsInfo.getBlockPoolID();
+
     // register with the active name-node 
     registerWith(nsInfo);
     // Checkpoint daemon should start after the rpc server started
@@ -366,4 +374,12 @@ public class BackupNode extends NameNode
       + FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
     return nsInfo;
   }
+  
+  String getBlockPoolId() {
+    return blockPoolId;
+  }
+  
+  String getClusterId() {
+    return clusterId;
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Fri Apr 29 18:16:32 2011
@@ -443,7 +443,7 @@ public class BlockManager {
     if (!blk.isComplete()) {
       final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
       final DatanodeDescriptor[] locations = uc.getExpectedLocations();
-      return new LocatedBlock(uc, locations, pos, false);
+      return namesystem.createLocatedBlock(uc, locations, pos, false);
     }
 
     // get block locations
@@ -469,7 +469,7 @@ public class BlockManager {
           machines[j++] = d;
       }
     }
-    return new LocatedBlock(blk, machines, pos, isCorrupt);
+    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Fri Apr 29 18:16:32 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * A unique signature intended to identify checkpoint transactions.
@@ -37,25 +38,30 @@ public class CheckpointSignature extends
   long editsTime = -1L;
   long checkpointTime = -1L;
   MD5Hash imageDigest = null;
+  String blockpoolID = "";
 
   public CheckpointSignature() {}
 
   CheckpointSignature(FSImage fsImage) {
     super(fsImage.getStorage());
+    blockpoolID = fsImage.getBlockPoolID();
     editsTime = fsImage.getEditLog().getFsEditTime();
     checkpointTime = fsImage.getStorage().getCheckpointTime();
     imageDigest = fsImage.getStorage().getImageDigest();
+    checkpointTime = fsImage.getStorage().getCheckpointTime();
   }
 
   CheckpointSignature(String str) {
     String[] fields = str.split(FIELD_SEPARATOR);
-    assert fields.length == 6 : "Must be 6 fields in CheckpointSignature";
+    assert fields.length == 8 : "Must be 8 fields in CheckpointSignature";
     layoutVersion = Integer.valueOf(fields[0]);
     namespaceID = Integer.valueOf(fields[1]);
     cTime = Long.valueOf(fields[2]);
     editsTime = Long.valueOf(fields[3]);
     checkpointTime = Long.valueOf(fields[4]);
     imageDigest = new MD5Hash(fields[5]);
+    clusterID = fields[6];
+    blockpoolID = fields[7];
   }
 
   /**
@@ -66,31 +72,64 @@ public class CheckpointSignature extends
     return imageDigest;
   }
 
+  /**
+   * Get the cluster id from CheckpointSignature
+   * @return the cluster id
+   */
+  public String getClusterID() {
+    return clusterID;
+  }
+
+  /**
+   * Get the block pool id from CheckpointSignature
+   * @return the block pool id
+   */
+  public String getBlockpoolID() {
+    return blockpoolID;
+  }
+
+  /**
+   * Set the block pool id of CheckpointSignature.
+   * 
+   * @param blockpoolID the new blockpool id
+   */
+  public void setBlockpoolID(String blockpoolID) {
+    this.blockpoolID = blockpoolID;
+  }
+  
   public String toString() {
     return String.valueOf(layoutVersion) + FIELD_SEPARATOR
          + String.valueOf(namespaceID) + FIELD_SEPARATOR
          + String.valueOf(cTime) + FIELD_SEPARATOR
          + String.valueOf(editsTime) + FIELD_SEPARATOR
          + String.valueOf(checkpointTime) + FIELD_SEPARATOR
-         +  imageDigest.toString();
+         + imageDigest.toString() + FIELD_SEPARATOR
+         + clusterID + FIELD_SEPARATOR
+         + blockpoolID ;
   }
 
   void validateStorageInfo(FSImage si) throws IOException {
-    if(layoutVersion != si.getStorage().layoutVersion
-       || namespaceID != si.getStorage().namespaceID 
-       || cTime != si.getStorage().cTime
-       || checkpointTime != si.getStorage().getCheckpointTime() ||
-       !imageDigest.equals(si.getStorage().getImageDigest())) {
+    if(layoutVersion != si.getLayoutVersion()
+        || namespaceID != si.getNamespaceID() 
+        || cTime != si.getStorage().cTime
+        || checkpointTime != si.getStorage().getCheckpointTime() 
+        || !imageDigest.equals(si.getStorage().imageDigest)
+        || !clusterID.equals(si.getClusterID())
+        || !blockpoolID.equals(si.getBlockPoolID())) {
       // checkpointTime can change when the image is saved - do not compare
       throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
           + " cTime = " + cTime + "; checkpointTime = " + checkpointTime
           + " ; imageDigest = " + imageDigest
+          + " ; clusterId = " + clusterID
+          + " ; blockpoolId = " + blockpoolID
           + ".\nExpecting respectively: "
-          + si.getStorage().layoutVersion + "; " 
-          + si.getStorage().namespaceID + "; " + si.getStorage().cTime
+          + si.getLayoutVersion() + "; " 
+          + si.getNamespaceID() + "; " + si.getStorage().cTime
           + "; " + si.getStorage().getCheckpointTime() + "; " 
-          + si.getStorage().getImageDigest());
+          + si.getStorage().imageDigest
+          + "; " + si.getClusterID() + "; " 
+          + si.getBlockPoolID() + ".");
     }
   }
 
@@ -106,6 +145,10 @@ public class CheckpointSignature extends
       (editsTime < o.editsTime) ? -1 : (editsTime > o.editsTime) ? 1 :
       (checkpointTime < o.checkpointTime) ? -1 : 
                   (checkpointTime > o.checkpointTime) ? 1 :
+      (clusterID.compareTo(o.clusterID) < 0) ? -1 : 
+                  (clusterID.compareTo(o.clusterID) > 0) ? 1 :
+      (blockpoolID.compareTo(o.blockpoolID) < 0) ? -1 : 
+                  (blockpoolID.compareTo(o.blockpoolID) > 0) ? 1 :
                     imageDigest.compareTo(o.imageDigest);
   }
 
@@ -119,7 +162,8 @@ public class CheckpointSignature extends
   public int hashCode() {
     return layoutVersion ^ namespaceID ^
             (int)(cTime ^ editsTime ^ checkpointTime) ^
-            imageDigest.hashCode();
+            imageDigest.hashCode() ^ clusterID.hashCode()
+            ^ blockpoolID.hashCode();
   }
 
   /////////////////////////////////////////////////
@@ -127,6 +171,7 @@ public class CheckpointSignature extends
   /////////////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
     super.write(out);
+    WritableUtils.writeString(out, blockpoolID);
     out.writeLong(editsTime);
     out.writeLong(checkpointTime);
     imageDigest.write(out);
@@ -134,6 +179,7 @@ public class CheckpointSignature extends
 
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
+    blockpoolID = WritableUtils.readString(in);
     editsTime = in.readLong();
     checkpointTime = in.readLong();
     imageDigest = new MD5Hash();

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Fri Apr 29 18:16:32 2011
@@ -125,26 +125,30 @@ class Checkpointer extends Daemon {
   // The main work loop
   //
   public void run() {
-    final long fiveMinMS = 5 * 60 * 1000; // How often to poll edits size
-    final long periodMS = checkpointPeriod * 1000; // How often to checkpoint, regardless of edits' size
-    long lastCheckpointTimeMS = backupNode.shouldCheckpointAtStartup() ? 0 : now();
-    long lastSizeCheckMS = now();
+    // Check the size of the edit log once every 5 minutes.
+    long periodMSec = 5 * 60;   // 5 minutes
+    if(checkpointPeriod < periodMSec) {
+      periodMSec = checkpointPeriod;
+    }
+    periodMSec *= 1000;
 
+    long lastCheckpointTime = 0;
+    if(!backupNode.shouldCheckpointAtStartup())
+      lastCheckpointTime = now();
     while(shouldRun) {
       try {
         long now = now();
-        boolean editsTooBig = false;
-        boolean periodExpired = now >= lastCheckpointTimeMS + periodMS;
-
-        if(now >= lastSizeCheckMS + fiveMinMS) {
-          editsTooBig = getJournalSize() > checkpointSize;
-          lastSizeCheckMS = now;
+        boolean shouldCheckpoint = false;
+        if(now >= lastCheckpointTime + periodMSec) {
+          shouldCheckpoint = true;
+        } else {
+          long size = getJournalSize();
+          if(size >= checkpointSize)
+            shouldCheckpoint = true;
         }
-
-        if(periodExpired || editsTooBig) {
+        if(shouldCheckpoint) {
           doCheckpoint();
-          lastCheckpointTimeMS = now;
-          lastSizeCheckMS = now;
+          lastCheckpointTime = now;
         }
       } catch(IOException e) {
         LOG.error("Exception in doCheckpoint: ", e);
@@ -154,7 +158,7 @@ class Checkpointer extends Daemon {
         break;
       }
       try {
-        Thread.sleep(Math.min(fiveMinMS, periodMS));
+        Thread.sleep(periodMSec);
       } catch(InterruptedException ie) {
         // do nothing
       }
@@ -248,6 +252,8 @@ class Checkpointer extends Daemon {
     }
 
     BackupImage bnImage = getFSImage();
+    bnImage.getStorage().setBlockPoolID(backupNode.getBlockPoolId());
+    bnImage.getStorage().setClusterID(backupNode.getClusterId());
     bnImage.loadCheckpoint(sig);
     sig.validateStorageInfo(bnImage);
     bnImage.saveCheckpoint();

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Apr 29 18:16:32 2011
@@ -26,10 +26,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.WritableUtils;
@@ -121,7 +117,6 @@ public class DatanodeDescriptor extends 
   private long lastBlocksScheduledRollTime = 0;
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
-  
   /** 
    * When set to true, the node is not in include list and is not allowed
    * to communicate with the namenode
@@ -135,7 +130,7 @@ public class DatanodeDescriptor extends 
    * @param nodeID id of the data node
    */
   public DatanodeDescriptor(DatanodeID nodeID) {
-    this(nodeID, 0L, 0L, 0L, 0);
+    this(nodeID, 0L, 0L, 0L, 0L, 0);
   }
 
   /** DatanodeDescriptor constructor
@@ -157,7 +152,7 @@ public class DatanodeDescriptor extends 
   public DatanodeDescriptor(DatanodeID nodeID, 
                             String networkLocation,
                             String hostName) {
-    this(nodeID, networkLocation, hostName, 0L, 0L, 0L, 0);
+    this(nodeID, networkLocation, hostName, 0L, 0L, 0L, 0L, 0);
   }
   
   /** DatanodeDescriptor constructor
@@ -165,16 +160,18 @@ public class DatanodeDescriptor extends 
    * @param nodeID id of the data node
    * @param capacity capacity of the data node
    * @param dfsUsed space used by the data node
-   * @param remaining remaing capacity of the data node
+   * @param remaining remaining capacity of the data node
+   * @param bpused space used by the block pool corresponding to this namenode
    * @param xceiverCount # of data transfers at the data node
    */
   public DatanodeDescriptor(DatanodeID nodeID, 
                             long capacity,
                             long dfsUsed,
                             long remaining,
+                            long bpused,
                             int xceiverCount) {
     super(nodeID);
-    updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
+    updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount);
   }
 
   /** DatanodeDescriptor constructor
@@ -184,6 +181,7 @@ public class DatanodeDescriptor extends 
    * @param capacity capacity of the data node, including space used by non-dfs
    * @param dfsUsed the used space by dfs datanode
    * @param remaining remaining capacity of the data node
+   * @param bpused space used by the block pool corresponding to this namenode
    * @param xceiverCount # of data transfers at the data node
    */
   public DatanodeDescriptor(DatanodeID nodeID,
@@ -192,9 +190,10 @@ public class DatanodeDescriptor extends 
                             long capacity,
                             long dfsUsed,
                             long remaining,
+                            long bpused,
                             int xceiverCount) {
     super(nodeID, networkLocation, hostName);
-    updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
+    updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount);
   }
 
   /**
@@ -250,6 +249,7 @@ public class DatanodeDescriptor extends 
   void resetBlocks() {
     this.capacity = 0;
     this.remaining = 0;
+    this.blockPoolUsed = 0;
     this.dfsUsed = 0;
     this.xceiverCount = 0;
     this.blockList = null;
@@ -263,10 +263,11 @@ public class DatanodeDescriptor extends 
   /**
    */
   void updateHeartbeat(long capacity, long dfsUsed, long remaining,
-      int xceiverCount) {
+      long blockPoolUsed, int xceiverCount) {
     this.capacity = capacity;
     this.dfsUsed = dfsUsed;
     this.remaining = remaining;
+    this.blockPoolUsed = blockPoolUsed;
     this.lastUpdate = System.currentTimeMillis();
     this.xceiverCount = xceiverCount;
     rollBlocksScheduled(lastUpdate);
@@ -353,31 +354,22 @@ public class DatanodeDescriptor extends 
     }
   }
   
-  BlockCommand getReplicationCommand(int maxTransfers) {
-    List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
-    return blocktargetlist == null? null:
-        new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
+  List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
+    return replicateBlocks.poll(maxTransfers);
   }
 
-  BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+  BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
     List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
       return null;
-    BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
-    for(BlockInfoUnderConstruction b : blocks) {
-      brCommand.add(new RecoveringBlock(
-          b, b.getExpectedLocations(), b.getBlockRecoveryId()));
-    }
-    return brCommand;
+    return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]);
   }
 
   /**
    * Remove the specified number of blocks to be invalidated
    */
-  BlockCommand getInvalidateBlocks(int maxblocks) {
-    Block[] deleteList = getBlockArray(invalidateBlocks, maxblocks); 
-    return deleteList == null? 
-        null: new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, deleteList);
+  Block[] getInvalidateBlocks(int maxblocks) {
+    return getBlockArray(invalidateBlocks, maxblocks); 
   }
 
   static private Block[] getBlockArray(Collection<Block> blocks, int max) {
@@ -427,6 +419,7 @@ public class DatanodeDescriptor extends 
     this.capacity = in.readLong();
     this.dfsUsed = in.readLong();
     this.remaining = in.readLong();
+    this.blockPoolUsed = in.readLong();
     this.lastUpdate = in.readLong();
     this.xceiverCount = in.readInt();
     this.location = Text.readString(in);
@@ -542,6 +535,18 @@ public class DatanodeDescriptor extends 
   }
   
   /**
+   * Set the flag to indicate if this datanode is disallowed from communicating
+   * with the namenode.
+   */
+  void setDisallowed(boolean flag) {
+    disallowed = flag;
+  }
+  
+  boolean isDisallowed() {
+    return disallowed;
+  }
+
+  /**
    * @return number of failed volumes in the datanode.
    */
   public int getVolumeFailures() {
@@ -556,16 +561,4 @@ public class DatanodeDescriptor extends 
     super.updateRegInfo(nodeReg);
     volumeFailures = 0;
   }
-  
-  /**
-   * Set the flag to indicate if this datanode is disallowed from communicating
-   * with the namenode.
-   */
-  void setDisallowed(boolean flag) {
-    disallowed = flag;
-  }
-  
-  boolean isDisallowed() {
-    return disallowed;
-  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java Fri Apr 29 18:16:32 2011
@@ -85,7 +85,7 @@ abstract class DfsServlet extends HttpSe
     return DFSClient.createNamenode(nnAddr, conf);
   }
 
-  /** Create a URI for redirecting request */
+  /** Create a URI for redirecting request to a datanode */
   protected URI createRedirectUri(String servletpath, 
                                   UserGroupInformation ugi,
                                   DatanodeID host, 
@@ -109,6 +109,10 @@ abstract class DfsServlet extends HttpSe
       params.append("&ugi=");
       params.append(ugi.getShortUserName());
     }
+    
+    // Add namenode address to the URL params
+    String nnAddr = NameNode.getHostPortString(nn.getNameNodeAddress());
+    params.append(JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr));
     return new URI(scheme, null, hostname, port, servletpath,
                    params.toString(), null);
   }
@@ -125,6 +129,6 @@ abstract class DfsServlet extends HttpSe
   
   protected UserGroupInformation getUGI(HttpServletRequest request,
                                         Configuration conf) throws IOException {
-    return JspHelper.getUGI(request, conf);
+    return JspHelper.getUGI(getServletContext(), request, conf);
   }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Apr 29 18:16:32 2011
@@ -154,7 +154,7 @@ class FSDirectory implements Closeable {
     // format before starting up if requested
     if (startOpt == StartupOption.FORMAT) {
       fsImage.getStorage().setStorageDirectories(dataDirs, editsDirs);
-      fsImage.getStorage().format();
+      fsImage.getStorage().format(fsImage.getStorage().determineClusterId()); // reuse current id
       startOpt = StartupOption.REGULAR;
     }
     try {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Apr 29 18:16:32 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -139,6 +140,10 @@ public class FSImage implements NNStorag
     storage.setStorageDirectories(fsDirs, fsEditsDirs);
   }
 
+  public FSImage(StorageInfo storageInfo, String bpid) {
+    storage = new NNStorage(storageInfo, bpid);
+  }
+
   /**
    * Represents an Image (image and edit file).
    * @throws IOException 
@@ -251,12 +256,31 @@ public class FSImage implements NNStorag
     }
     if (startOpt != StartupOption.UPGRADE
         && storage.getLayoutVersion() < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
-        && storage.getLayoutVersion() != FSConstants.LAYOUT_VERSION)
+        && storage.getLayoutVersion() != FSConstants.LAYOUT_VERSION) {
       throw new IOException(
           "\nFile system image contains an old layout version " 
           + storage.getLayoutVersion() + ".\nAn upgrade to version "
           + FSConstants.LAYOUT_VERSION + " is required.\n"
           + "Please restart NameNode with -upgrade option.");
+    }
+    
+    // Upgrade to federation requires -upgrade -clusterid <clusterID> option
+    if (startOpt == StartupOption.UPGRADE
+        && storage.getLayoutVersion() > Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      if (startOpt.getClusterId() == null) {
+        throw new IOException(
+            "\nFile system image contains an old layout version "
+                + storage.getLayoutVersion() + ".\nAn upgrade to version "
+                + FSConstants.LAYOUT_VERSION
+                + " is required.\nPlease restart NameNode with "
+                + "-upgrade -clusterid <clusterID> option.");
+      }
+      storage.setClusterID(startOpt.getClusterId());
+      
+      // Create new block pool Id
+      storage.setBlockPoolID(storage.newBlockPoolID());
+    }
+    
     // check whether distributed upgrade is reguired and/or should be continued
     storage.verifyDistributedUpgradeProgress(startOpt);
 
@@ -456,6 +480,7 @@ public class FSImage implements NNStorag
     realImage.getStorage().setStorageInfo(ckptImage.getStorage());
     storage.setCheckpointTime(ckptImage.getStorage().getCheckpointTime());
     fsNamesys.dir.fsImage = realImage;
+    realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
     // and save it but keep the same checkpointTime
     saveNamespace(false);
   }
@@ -659,7 +684,7 @@ public class FSImage implements NNStorag
     FSImageFormat.Loader loader = new FSImageFormat.Loader(
         conf, getFSNamesystem());
     loader.load(curFile);
-
+    namesystem.setBlockPoolId(this.getBlockPoolID());
 
     // Check that the image digest we loaded matches up with what
     // we expected
@@ -1123,6 +1148,7 @@ public class FSImage implements NNStorag
     storage.close();
   }
 
+
   /**
    * Retrieve checkpoint dirs from configuration.
    *
@@ -1177,4 +1203,20 @@ public class FSImage implements NNStorag
   public void directoryAvailable(StorageDirectory sd) throws IOException {
     // do nothing
   }
+
+  public int getLayoutVersion() {
+    return storage.getLayoutVersion();
+  }
+  
+  public int getNamespaceID() {
+    return storage.getNamespaceID();
+  }
+  
+  public String getClusterID() {
+    return storage.getClusterID();
+  }
+  
+  public String getBlockPoolID() {
+    return storage.getBlockPoolID();
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Apr 29 18:16:32 2011
@@ -52,10 +52,14 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@@ -63,6 +67,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -113,7 +118,7 @@ import javax.management.MBeanServer;
  * 1)  valid fsname --> blocklist  (kept on disk, logged)
  * 2)  Set of all valid blocks (inverted #1)
  * 3)  block --> machinelist (kept in memory, rebuilt dynamically from reports)
- * 4)  machine --> blocklist (inverted #3)
+ * 4)  machine --> blocklist (inverted #2)
  * 5)  LRU cache of updated-heartbeat machines
  ***************************************************/
 @InterfaceAudience.Private
@@ -174,6 +179,7 @@ public class FSNamesystem implements FSC
   // FSNamesystemMetrics counter variables
   private FSNamesystemMetrics myFSMetrics;
   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
+  private long blockPoolUsed = 0L;
   private int totalLoad = 0;
   boolean isBlockTokenEnabled;
   BlockTokenSecretManager blockTokenSecretManager;
@@ -189,8 +195,10 @@ public class FSNamesystem implements FSC
   // Stores the correct file name hierarchy
   //
   public FSDirectory dir;
-
   BlockManager blockManager;
+  
+  // Block pool ID used by this namenode
+  String blockPoolId;
     
   /**
    * Stores the datanode -> block map.  
@@ -554,6 +562,8 @@ public class FSNamesystem implements FSC
   
   NamespaceInfo getNamespaceInfo() {
     return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+                             getClusterId(),
+                             getBlockPoolId(),
                              dir.fsImage.getStorage().getCTime(),
                              getDistributedUpgradeVersion());
   }
@@ -923,34 +933,42 @@ public class FSNamesystem implements FSC
       if (LOG.isDebugEnabled()) {
         LOG.debug("last = " + last);
       }
-
-      if(isBlockTokenEnabled && needBlockToken) {
+      
+      LocatedBlock lastBlock = last.isComplete() ? blockManager
+          .getBlockLocation(last, n - last.getNumBytes()) : blockManager
+          .getBlockLocation(last, n);
+          
+      if (isBlockTokenEnabled && needBlockToken) {
         setBlockTokens(locatedblocks);
+        setBlockToken(lastBlock);
       }
-
-      if (last.isComplete()) {
-        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
-          blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
-      } else {
-        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
-          blockManager.getBlockLocation(last, n), false);
-      }
+      return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+          lastBlock, last.isComplete());
     }
     } finally {
       readUnlock();
     }
   }
 
+  /** Create a LocatedBlock. */
+  LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+      final long offset, final boolean corrupt) throws IOException {
+    return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
+  }
+  
   /** Generate block tokens for the blocks to be returned. */
   private void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
     for(LocatedBlock l : locatedBlocks) {
-      Token<BlockTokenIdentifier> token = 
-        blockTokenSecretManager.generateToken(l.getBlock(), 
-            EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
-    
-      l.setBlockToken(token);
+      setBlockToken(l);
     }
   }
+  
+  /** Generate block token for a LocatedBlock. */
+  private void setBlockToken(LocatedBlock l) throws IOException {
+    Token<BlockTokenIdentifier> token = blockTokenSecretManager.generateToken(l
+        .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
+    l.setBlockToken(token);
+  }
 
   /**
    * Moves all the blocks from srcs and appends them to trg
@@ -1547,6 +1565,14 @@ public class FSNamesystem implements FSC
     return lb;
   }
 
+  ExtendedBlock getExtendedBlock(Block blk) {
+    return new ExtendedBlock(blockPoolId, blk);
+  }
+  
+  void setBlockPoolId(String bpid) {
+    blockPoolId = bpid;
+  }
+
   /**
    * The client would like to obtain an additional block for the indicated
    * filename (which is being written-to).  Return an array that consists
@@ -1560,12 +1586,13 @@ public class FSNamesystem implements FSC
    */
   public LocatedBlock getAdditionalBlock(String src,
                                          String clientName,
-                                         Block previous,
+                                         ExtendedBlock previous,
                                          HashMap<Node, Node> excludedNodes
                                          ) 
       throws LeaseExpiredException, NotReplicatedYetException,
       QuotaExceededException, SafeModeException, UnresolvedLinkException,
       IOException {
+    checkBlock(previous);
     long fileLength, blockSize;
     int replication;
     DatanodeDescriptor clientNode = null;
@@ -1589,7 +1616,8 @@ public class FSNamesystem implements FSC
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
 
       // commit the last block and complete it if it has minimum replicas
-      blockManager.commitOrCompleteLastBlock(pendingFile, previous);
+      blockManager.commitOrCompleteLastBlock(pendingFile, ExtendedBlock
+          .getLocalBlock(previous));
 
       //
       // If we fail this, bad things happen!
@@ -1638,7 +1666,7 @@ public class FSNamesystem implements FSC
     }
         
     // Create next block
-    LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
+    LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
     if (isBlockTokenEnabled) {
       b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(), 
           EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
@@ -1646,8 +1674,8 @@ public class FSNamesystem implements FSC
     return b;
   }
 
-  /** @see NameNode#getAdditionalDatanode(String, Block, DatanodeInfo[], DatanodeInfo[], int, String) */
-  LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+  /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
+  LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
       final DatanodeInfo[] existings,  final HashMap<Node, Node> excludes,
       final int numAdditionalNodes, final String clientName
       ) throws IOException {
@@ -1696,7 +1724,7 @@ public class FSNamesystem implements FSC
   /**
    * The client would like to let go of the given block
    */
-  public boolean abandonBlock(Block b, String src, String holder)
+  public boolean abandonBlock(ExtendedBlock b, String src, String holder)
       throws LeaseExpiredException, FileNotFoundException,
       UnresolvedLinkException, IOException {
     writeLock();
@@ -1709,7 +1737,7 @@ public class FSNamesystem implements FSC
                                     +b+"of file "+src);
     }
     INodeFileUnderConstruction file = checkLease(src, holder);
-    dir.removeBlock(src, file, b);
+    dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                     + b
@@ -1761,9 +1789,11 @@ public class FSNamesystem implements FSC
    *         (e.g if not all blocks have reached minimum replication yet)
    * @throws IOException on error (eg lease mismatch, file not open, file deleted)
    */
-  public boolean completeFile(String src, String holder, Block last) 
+  public boolean completeFile(String src, String holder, ExtendedBlock last) 
     throws SafeModeException, UnresolvedLinkException, IOException {
-    boolean success = completeFileInternal(src, holder, last);
+    checkBlock(last);
+    boolean success = completeFileInternal(src, holder, 
+        ExtendedBlock.getLocalBlock(last));
     getEditLog().logSync();
     return success ;
   }
@@ -1831,7 +1861,7 @@ public class FSNamesystem implements FSC
     b.setGenerationStamp(getGenerationStamp());
     b = dir.addBlock(src, inodes, b, targets);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
-                                 +src+ ". "+b);
+                                 +src+ ". " + blockPoolId + " "+ b);
     return b;
   }
 
@@ -1879,15 +1909,14 @@ public class FSNamesystem implements FSC
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
    */
-  public void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
+  public void markBlockAsCorrupt(ExtendedBlock blk, DatanodeInfo dn)
     throws IOException {
     writeLock();
     try {
-      blockManager.findAndMarkBlockAsCorrupt(blk, dn);
+      blockManager.findAndMarkBlockAsCorrupt(blk.getLocalBlock(), dn);
     } finally {
       writeUnlock();
     }
-
   }
 
 
@@ -2396,7 +2425,7 @@ public class FSNamesystem implements FSC
     checkReplicationFactor(newFile);
   }
 
-  void commitBlockSynchronization(Block lastblock,
+  void commitBlockSynchronization(ExtendedBlock lastblock,
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
       throws IOException, UnresolvedLinkException {
@@ -2410,7 +2439,8 @@ public class FSNamesystem implements FSC
           + ", closeFile=" + closeFile
           + ", deleteBlock=" + deleteblock
           + ")");
-    final BlockInfo storedBlock = blockManager.getStoredBlock(lastblock);
+    final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
+        .getLocalBlock(lastblock));
     if (storedBlock == null) {
       throw new IOException("Block (=" + lastblock + ") not found");
     }
@@ -2432,7 +2462,7 @@ public class FSNamesystem implements FSC
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
     if (deleteblock) {
-      pendingFile.removeLastBlock(lastblock);
+      pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
       blockManager.removeBlockFromMap(storedBlock);
     }
     else {
@@ -2557,7 +2587,7 @@ public class FSNamesystem implements FSC
    * namespaceID and will continue serving the datanodes that has previously
    * registered with the namenode without restarting the whole cluster.
    * 
-   * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
+   * @see org.apache.hadoop.hdfs.server.datanode.DataNode
    */
   public void registerDatanode(DatanodeRegistration nodeReg
                                             ) throws IOException {
@@ -2644,7 +2674,7 @@ public class FSNamesystem implements FSC
         if( !heartbeats.contains(nodeS)) {
           heartbeats.add(nodeS);
           //update its timestamp
-          nodeS.updateHeartbeat(0L, 0L, 0L, 0);
+          nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0);
           nodeS.isAlive = true;
         }
       }
@@ -2764,7 +2794,7 @@ public class FSNamesystem implements FSC
    * @throws IOException
    */
   DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
-      long capacity, long dfsUsed, long remaining,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int xmitsInProgress) throws IOException {
     DatanodeCommand cmd = null;
     synchronized (heartbeats) {
@@ -2787,25 +2817,37 @@ public class FSNamesystem implements FSC
         }
 
         updateStats(nodeinfo, false);
-        nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
+        nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
+            xceiverCount);
         updateStats(nodeinfo, true);
         
         //check lease recovery
-        cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
-        if (cmd != null) {
-          return new DatanodeCommand[] {cmd};
+        BlockInfoUnderConstruction[] blocks = nodeinfo
+            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        if (blocks != null) {
+          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
+              blocks.length);
+          for (BlockInfoUnderConstruction b : blocks) {
+            brCommand.add(new RecoveringBlock(
+                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
+                    .getBlockRecoveryId()));
+          }
+          return new DatanodeCommand[] { brCommand };
         }
       
         ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
         //check pending replication
-        cmd = nodeinfo.getReplicationCommand(
+        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
               blockManager.maxReplicationStreams - xmitsInProgress);
-        if (cmd != null) {
+        if (pendingList != null) {
+          cmd = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+              pendingList);
           cmds.add(cmd);
         }
         //check block invalidation
-        cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
-        if (cmd != null) {
+        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+        if (blks != null) {
+          cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks);
           cmds.add(cmd);
         }
         // check access key update
@@ -2836,6 +2878,7 @@ public class FSNamesystem implements FSC
     assert(Thread.holdsLock(heartbeats));
     if (isAdded) {
       capacityUsed += node.getDfsUsed();
+      blockPoolUsed += node.getBlockPoolUsed();
       totalLoad += node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
         capacityTotal += node.getCapacity();
@@ -2845,6 +2888,7 @@ public class FSNamesystem implements FSC
       }
     } else {
       capacityUsed -= node.getDfsUsed();
+      blockPoolUsed -= node.getBlockPoolUsed();
       totalLoad -= node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
         capacityTotal -= node.getCapacity();
@@ -3195,7 +3239,7 @@ public class FSNamesystem implements FSC
    * The given node is reporting all its blocks.  Use this info to 
    * update the (machine-->blocklist) and (block-->machinelist) tables.
    */
-  public void processReport(DatanodeID nodeID, 
+  public void processReport(DatanodeID nodeID, String poolId,
       BlockListAsLongs newReport) throws IOException {
     long startTime, endTime;
 
@@ -3335,6 +3379,7 @@ public class FSNamesystem implements FSC
    * The given node is reporting that it received a certain block.
    */
   public void blockReceived(DatanodeID nodeID,  
+                                         String poolId,
                                          Block block,
                                          String delHint
                                          ) throws IOException {
@@ -3359,6 +3404,13 @@ public class FSNamesystem implements FSC
     }
   }
 
+  private void checkBlock(ExtendedBlock block) throws IOException {
+    if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) {
+      throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
+          + " - expected " + blockPoolId);
+    }
+  }
+
   public long getMissingBlocksCount() {
     // not locking
     return blockManager.getMissingBlocksCount();
@@ -3370,33 +3422,36 @@ public class FSNamesystem implements FSC
                          this.capacityRemaining,
                          getUnderReplicatedBlocks(),
                          getCorruptReplicaBlocks(),
-                         getMissingBlocksCount()};
+                         getMissingBlocksCount(),
+                         getBlockPoolUsedSpace()};
     }
   }
 
   /**
    * Total raw bytes including non-dfs used space.
    */
+  @Override // FSNamesystemMBean
   public long getCapacityTotal() {
-    return getStats()[0];
+    synchronized(heartbeats) {
+      return capacityTotal;
+    }
   }
 
   /**
    * Total used space by data nodes
    */
+  @Override // FSNamesystemMBean
   public long getCapacityUsed() {
-    return getStats()[1];
+    synchronized(heartbeats) {
+      return capacityUsed;
+    }
   }
   /**
    * Total used space by data nodes as percentage of total capacity
    */
   public float getCapacityUsedPercent() {
     synchronized(heartbeats){
-      if (capacityTotal <= 0) {
-        return 100;
-      }
-
-      return ((float)capacityUsed * 100.0f)/(float)capacityTotal;
+      return DFSUtil.getPercentUsed(capacityUsed, capacityTotal);
     }
   }
   /**
@@ -3414,7 +3469,9 @@ public class FSNamesystem implements FSC
    * Total non-used raw bytes.
    */
   public long getCapacityRemaining() {
-    return getStats()[2];
+    synchronized(heartbeats) {
+      return capacityRemaining;
+    }
   }
 
   /**
@@ -3422,16 +3479,13 @@ public class FSNamesystem implements FSC
    */
   public float getCapacityRemainingPercent() {
     synchronized(heartbeats){
-      if (capacityTotal <= 0) {
-        return 0;
-      }
-
-      return ((float)capacityRemaining * 100.0f)/(float)capacityTotal;
+      return DFSUtil.getPercentRemaining(capacityRemaining, capacityTotal);
     }
   }
   /**
    * Total number of connections.
    */
+  @Override // FSNamesystemMBean
   public int getTotalLoad() {
     synchronized (heartbeats) {
       return this.totalLoad;
@@ -4368,6 +4422,7 @@ public class FSNamesystem implements FSC
   /**
    * Get the total number of blocks in the system. 
    */
+  @Override // FSNamesystemMBean
   public long getBlocksTotal() {
     return blockManager.getTotalBlocks();
   }
@@ -4641,14 +4696,17 @@ public class FSNamesystem implements FSC
     return maxFsObjects;
   }
 
+  @Override // FSNamesystemMBean
   public long getFilesTotal() {
     return this.dir.totalInodes();
   }
 
+  @Override // FSNamesystemMBean
   public long getPendingReplicationBlocks() {
     return blockManager.pendingReplicationBlocksCount;
   }
 
+  @Override // FSNamesystemMBean
   public long getUnderReplicatedBlocks() {
     return blockManager.underReplicatedBlocksCount;
   }
@@ -4663,6 +4721,7 @@ public class FSNamesystem implements FSC
     return blockManager.corruptReplicaBlocksCount;
   }
 
+  @Override // FSNamesystemMBean
   public long getScheduledReplicationBlocks() {
     return blockManager.scheduledReplicationBlocksCount;
   }
@@ -4679,6 +4738,7 @@ public class FSNamesystem implements FSC
     return blockManager.getCapacity();
   }
 
+  @Override // FSNamesystemMBean
   public String getFSState() {
     return isInSafeMode() ? "safeMode" : "Operational";
   }
@@ -4725,6 +4785,7 @@ public class FSNamesystem implements FSC
    * Number of live data nodes
    * @return Number of live data nodes
    */
+  @Override // FSNamesystemMBean
   public int getNumLiveDataNodes() {
     int numLive = 0;
     synchronized (datanodeMap) {   
@@ -4744,6 +4805,7 @@ public class FSNamesystem implements FSC
    * Number of dead data nodes
    * @return Number of dead data nodes
    */
+  @Override // FSNamesystemMBean
   public int getNumDeadDataNodes() {
     int numDead = 0;
     synchronized (datanodeMap) {   
@@ -4781,15 +4843,15 @@ public class FSNamesystem implements FSC
     return gs;
   }
 
-  private INodeFileUnderConstruction checkUCBlock(Block block, String clientName) 
-  throws IOException {
+  private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
+      String clientName) throws IOException {
     // check safe mode
     if (isInSafeMode())
       throw new SafeModeException("Cannot get a new generation stamp and an " +
                                 "access token for block " + block, safeMode);
     
     // check stored block state
-    BlockInfo storedBlock = blockManager.getStoredBlock(block);
+    BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock.getLocalBlock(block));
     if (storedBlock == null || 
         storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
         throw new IOException(block + 
@@ -4800,7 +4862,7 @@ public class FSNamesystem implements FSC
     INodeFile file = storedBlock.getINode();
     if (file==null || !file.isUnderConstruction()) {
       throw new IOException("The file " + storedBlock + 
-          " is belonged to does not exist or it is not under construction.");
+          " belonged to does not exist or it is not under construction.");
     }
     
     // check lease
@@ -4825,7 +4887,7 @@ public class FSNamesystem implements FSC
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
-  LocatedBlock updateBlockForPipeline(Block block, 
+  LocatedBlock updateBlockForPipeline(ExtendedBlock block, 
       String clientName) throws IOException {
     writeLock();
     try {
@@ -4855,8 +4917,8 @@ public class FSNamesystem implements FSC
    * @param newNodes datanodes in the pipeline
    * @throws IOException if any error occurs
    */
-  void updatePipeline(String clientName, Block oldBlock, 
-      Block newBlock, DatanodeID[] newNodes)
+  void updatePipeline(String clientName, ExtendedBlock oldBlock, 
+      ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
     writeLock();
     try {
@@ -5428,6 +5490,20 @@ public class FSNamesystem implements FSC
   }
 
   @Override // NameNodeMXBean
+  public long getBlockPoolUsedSpace() {
+    synchronized(heartbeats) {
+      return blockPoolUsed;
+    }
+  }
+
+  @Override // NameNodeMXBean
+  public float getPercentBlockPoolUsed() {
+    synchronized(heartbeats) {
+      return DFSUtil.getPercentUsed(blockPoolUsed, capacityTotal);
+    }
+  }
+
+  @Override // NameNodeMXBean
   public float getPercentRemaining() {
     return getCapacityRemainingPercent();
   }
@@ -5513,4 +5589,14 @@ public class FSNamesystem implements FSC
   private long getDfsUsed(DatanodeDescriptor alivenode) {
     return alivenode.getDfsUsed();
   }
+
+  @Override  // NameNodeMXBean
+  public String getClusterId() {
+    return dir.fsImage.getStorage().getClusterID();
+  }
+  
+  @Override  // NameNodeMXBean
+  public String getBlockPoolId() {
+    return blockPoolId;
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Fri Apr 29 18:16:32 2011
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
 
 import javax.net.SocketFactory;
 import javax.servlet.ServletContext;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.net.NetUtils;
@@ -89,19 +89,18 @@ public class FileChecksumServlets {
       final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
       xml.declaration();
 
-      final Configuration conf = new HdfsConfiguration(DataNode.getDataNode().getConf());
-      final int socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
-      final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
+      final Configuration conf = 
+        new HdfsConfiguration(DataNode.getDataNode().getConf());
+      final int socketTimeout = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+          HdfsConstants.READ_TIMEOUT);
+      final SocketFactory socketFactory = NetUtils.getSocketFactory(conf,
+          ClientProtocol.class);
       
       try {
-        ClientProtocol nnproxy = getUGI(request, conf).doAs
-        (new PrivilegedExceptionAction<ClientProtocol>() {
-          @Override
-          public ClientProtocol run() throws IOException {
-            return DFSClient.createNamenode(conf);
-          }
-        });
-        
+        final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, 
+            DataNode.getDataNode(), conf, getUGI(request, conf));
+        final ClientProtocol nnproxy = dfs.getNamenode();
         final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
             filename, nnproxy, socketFactory, socketTimeout);
         MD5MD5CRC32FileChecksum.write(xml, checksum);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Fri Apr 29 18:16:32 2011
@@ -49,7 +49,9 @@ public class FileDataServlet extends Dfs
       ClientProtocol nnproxy, HttpServletRequest request, String dt)
       throws IOException, URISyntaxException {
     String scheme = request.getScheme();
-    final DatanodeID host = pickSrcDatanode(parent, i, nnproxy);
+    final LocatedBlocks blks = nnproxy.getBlockLocations(
+        i.getFullPath(new Path(parent)).toUri().getPath(), 0, 1);
+    final DatanodeID host = pickSrcDatanode(blks, i);
     final String hostname;
     if (host instanceof DatanodeInfo) {
       hostname = ((DatanodeInfo)host).getHostName();
@@ -62,22 +64,25 @@ public class FileDataServlet extends Dfs
       dtParam=JspHelper.getDelegationTokenUrlParam(dt);
     }
 
+    // Add namenode address to the url params
+    NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
+    String addr = NameNode.getHostPortString(nn.getNameNodeAddress());
+    String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
+    
     return new URI(scheme, null, hostname,
         "https".equals(scheme)
           ? (Integer)getServletContext().getAttribute("datanode.https.port")
           : host.getInfoPort(),
             "/streamFile" + i.getFullName(parent), 
-            "ugi=" + ugi.getShortUserName() + dtParam, null);
+            "ugi=" + ugi.getShortUserName() + dtParam + addrParam, null);
   }
 
   /** Select a datanode to service this request.
    * Currently, this looks at no more than the first five blocks of a file,
    * selecting a datanode randomly from the most represented.
    */
-  private DatanodeID pickSrcDatanode(String parent, HdfsFileStatus i,
-      ClientProtocol nnproxy) throws IOException {
-    final LocatedBlocks blks = nnproxy.getBlockLocations(
-        i.getFullPath(new Path(parent)).toUri().getPath(), 0, 1);
+  private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i)
+      throws IOException {
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
       // pick a random datanode
       NameNode nn = (NameNode)getServletContext().getAttribute("name.node");

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Fri Apr 29 18:16:32 2011
@@ -27,12 +27,16 @@ import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.Closeable;
 import java.net.URI;
+import java.net.UnknownHostException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.Properties;
+import java.util.UUID;
 import java.io.RandomAccessFile;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -41,6 +45,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
@@ -51,6 +56,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.net.DNS;
 
 /**
  * NNStorage is responsible for management of the StorageDirectories used by
@@ -134,6 +140,7 @@ public class NNStorage extends Storage i
   final private List<NNStorageListener> listeners;
   private UpgradeManager upgradeManager = null;
   protected MD5Hash imageDigest = null;
+  protected String blockpoolID = ""; // id of the block pool
 
   /**
    * flag that controls if we try to restore failed storages
@@ -161,6 +168,19 @@ public class NNStorage extends Storage i
     this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
   }
 
+  /**
+   * Construct the NNStorage.
+   * @param storageInfo storage information
+   * @param bpid block pool Id
+   */
+  public NNStorage(StorageInfo storageInfo, String bpid) {
+    super(NodeType.NAME_NODE, storageInfo);
+
+    storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
+    this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
+    this.blockpoolID = bpid;
+  }
+
   @Override // Storage
   public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
     if (disablePreUpgradableLayoutCheck) {
@@ -553,7 +573,6 @@ public class NNStorage extends Storage i
    * in this filesystem. */
   private void format(StorageDirectory sd) throws IOException {
     sd.clearDirectory(); // create currrent dir
-
     for (NNStorageListener listener : listeners) {
       listener.formatOccurred(sd);
     }
@@ -566,9 +585,11 @@ public class NNStorage extends Storage i
   /**
    * Format all available storage directories.
    */
-  public void format() throws IOException {
+  public void format(String clusterId) throws IOException {
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     this.namespaceID = newNamespaceID();
+    this.clusterID = clusterId;
+    this.blockpoolID = newBlockPoolID();
     this.cTime = 0L;
     this.setCheckpointTime(now());
     for (Iterator<StorageDirectory> it =
@@ -599,6 +620,7 @@ public class NNStorage extends Storage i
     return newID;
   }
 
+
   /**
    * Move {@code current} to {@code lastcheckpoint.tmp} and
    * recreate empty {@code current}.
@@ -647,9 +669,17 @@ public class NNStorage extends Storage i
                            StorageDirectory sd
                            ) throws IOException {
     super.getFields(props, sd);
-    if (layoutVersion == 0)
+    if (layoutVersion == 0) {
       throw new IOException("NameNode directory "
                             + sd.getRoot() + " is not formatted.");
+    }
+
+    // No Block pool ID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      String sbpid = props.getProperty("blockpoolID");
+      setBlockPoolID(sd.getRoot(), sbpid);
+    }
+    
     String sDUS, sDUV;
     sDUS = props.getProperty("distributedUpgradeState");
     sDUV = props.getProperty("distributedUpgradeVersion");
@@ -689,6 +719,10 @@ public class NNStorage extends Storage i
                            StorageDirectory sd
                            ) throws IOException {
     super.setFields(props, sd);
+    // Set blockpoolID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      props.setProperty("blockpoolID", blockpoolID);
+    }
     boolean uState = getDistributedUpgradeState();
     int uVersion = getDistributedUpgradeVersion();
     if(uState && uVersion != getLayoutVersion()) {
@@ -891,4 +925,104 @@ public class NNStorage extends Storage i
     lsd = listStorageDirectories();
     LOG.debug("at the end current list of storage dirs:" + lsd);
   }
+  
+  /**
+   * Generate new clusterID.
+   * 
+   * clusterID is a persistent attribute of the cluster.
+   * It is generated when the cluster is created and remains the same
+   * during the life cycle of the cluster.  When a new name node is formated, if 
+   * this is a new cluster, a new clusterID is geneated and stored.  Subsequent 
+   * name node must be given the same ClusterID during its format to be in the 
+   * same cluster.
+   * When a datanode register it receive the clusterID and stick with it.
+   * If at any point, name node or data node tries to join another cluster, it 
+   * will be rejected.
+   * 
+   * @return new clusterID
+   */ 
+  public static String newClusterID() {
+    return "CID-" + UUID.randomUUID().toString();
+  }
+
+  void setClusterID(String cid) {
+    clusterID = cid;
+  }
+
+  /**
+   * try to find current cluster id in the VERSION files
+   * returns first cluster id found in any VERSION file
+   * null in case none found
+   * @return clusterId or null in case no cluster id found
+   */
+  public String determineClusterId() {
+    String cid = null;
+    Iterator<StorageDirectory> sdit = dirIterator(NameNodeDirType.IMAGE);
+    while(sdit.hasNext()) {
+      StorageDirectory sd = sdit.next();
+      try {
+        Properties props = sd.readFrom(sd.getVersionFile());
+        cid = props.getProperty("clusterID");
+        LOG.info("current cluster id for sd="+sd.getCurrentDir() + 
+            ";lv=" + layoutVersion + ";cid=" + cid);
+        
+        if(cid != null && !cid.equals(""))
+          return cid;
+      } catch (Exception e) {
+        LOG.warn("this sd not available: " + e.getLocalizedMessage());
+      } //ignore
+    }
+    LOG.warn("couldn't find any VERSION file containing valid ClusterId");
+    return null;
+  }
+
+  /**
+   * Generate new blockpoolID.
+   * 
+   * @return new blockpoolID
+   */ 
+  String newBlockPoolID() throws UnknownHostException{
+    String ip = "unknownIP";
+    try {
+      ip = DNS.getDefaultIP("default");
+    } catch (UnknownHostException e) {
+      LOG.warn("Could not find ip address of \"default\" inteface.");
+      throw e;
+    }
+    
+    int rand = 0;
+    try {
+      rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
+    } catch (NoSuchAlgorithmException e) {
+      final Random R = new Random();
+      LOG.warn("Could not use SecureRandom");
+      rand = R.nextInt(Integer.MAX_VALUE);
+    }
+    String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis();
+    return bpid;
+  }
+
+  /** Validate and set block pool ID */
+  void setBlockPoolID(String bpid) {
+    blockpoolID = bpid;
+  }
+
+  /** Validate and set block pool ID */
+  private void setBlockPoolID(File storage, String bpid)
+      throws InconsistentFSStateException {
+    if (bpid == null || bpid.equals("")) {
+      throw new InconsistentFSStateException(storage, "file "
+          + Storage.STORAGE_FILE_VERSION + " has no block pool Id.");
+    }
+    
+    if (!blockpoolID.equals("") && !blockpoolID.equals(bpid)) {
+      throw new InconsistentFSStateException(storage,
+          "Unexepcted blockpoolID " + bpid + " . Expected " + blockpoolID);
+    }
+    blockpoolID = bpid;
+  }
+  
+  public String getBlockPoolID() {
+    return blockpoolID;
+  }
 }



Mime
View raw message