hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1143523 [2/4] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/protocol/datatransfe...
Date Wed, 06 Jul 2011 18:32:07 GMT
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Jul  6 18:32:04 2011
@@ -85,7 +85,10 @@ class DataXceiver extends Receiver imple
   private long opStartTime; //the start time of receiving an Op
   
   public DataXceiver(Socket s, DataNode datanode, 
-      DataXceiverServer dataXceiverServer) {
+      DataXceiverServer dataXceiverServer) throws IOException {
+    super(new DataInputStream(new BufferedInputStream(
+        NetUtils.getInputStream(s), FSConstants.SMALL_BUFFER_SIZE)));
+
     this.s = s;
     this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
@@ -122,18 +125,14 @@ class DataXceiver extends Receiver imple
   DataNode getDataNode() {return datanode;}
 
   /**
-   * Read/write data from/to the DataXceiveServer.
+   * Read/write data from/to the DataXceiverServer.
    */
   public void run() {
     updateCurrentThreadName("Waiting for operation");
 
-    DataInputStream in=null; 
     int opsProcessed = 0;
     Op op = null;
     try {
-      in = new DataInputStream(
-          new BufferedInputStream(NetUtils.getInputStream(s), 
-                                  SMALL_BUFFER_SIZE));
       int stdTimeout = s.getSoTimeout();
 
       // We process requests in a loop, and stay around for a short timeout.
@@ -145,7 +144,7 @@ class DataXceiver extends Receiver imple
             assert socketKeepaliveTimeout > 0;
             s.setSoTimeout(socketKeepaliveTimeout);
           }
-          op = readOp(in);
+          op = readOp();
         } catch (InterruptedIOException ignored) {
           // Time out while we wait for client rpc
           break;
@@ -176,7 +175,7 @@ class DataXceiver extends Receiver imple
         }
 
         opStartTime = now();
-        processOp(op, in);
+        processOp(op);
         ++opsProcessed;
       } while (!s.isClosed() && socketKeepaliveTimeout > 0);
     } catch (Throwable t) {
@@ -196,13 +195,12 @@ class DataXceiver extends Receiver imple
     }
   }
 
-  /**
-   * Read a block from the disk.
-   */
   @Override
-  protected void opReadBlock(DataInputStream in, ExtendedBlock block,
-      long startOffset, long length, String clientName,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void readBlock(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final long blockOffset,
+      final long length) throws IOException {
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
@@ -225,7 +223,7 @@ class DataXceiver extends Receiver imple
     updateCurrentThreadName("Sending block " + block);
     try {
       try {
-        blockSender = new BlockSender(block, startOffset, length,
+        blockSender = new BlockSender(block, blockOffset, length,
             true, true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
         LOG.info("opReadBlock " + block + " received exception " + e);
@@ -284,16 +282,17 @@ class DataXceiver extends Receiver imple
     datanode.metrics.incrReadsFromClient(isLocal);
   }
 
-  /**
-   * Write a block to disk.
-   */
   @Override
-  protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, 
-      final int pipelineSize, final BlockConstructionStage stage,
-      final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
-      final String clientname, final DatanodeInfo srcDataNode,
-      final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken
-      ) throws IOException {
+  public void writeBlock(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientname,
+      final DatanodeInfo[] targets,
+      final DatanodeInfo srcDataNode,
+      final BlockConstructionStage stage,
+      final int pipelineSize,
+      final long minBytesRcvd,
+      final long maxBytesRcvd,
+      final long latestGenerationStamp) throws IOException {
     updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
     final boolean isDatanode = clientname.length() == 0;
     final boolean isClient = !isDatanode;
@@ -308,7 +307,7 @@ class DataXceiver extends Receiver imple
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname 
-      		+ "\n  block  =" + block + ", newGs=" + newGs
+      		+ "\n  block  =" + block + ", newGs=" + latestGenerationStamp
       		+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
           + "\n  targets=" + Arrays.asList(targets)
           + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
@@ -351,10 +350,10 @@ class DataXceiver extends Receiver imple
         blockReceiver = new BlockReceiver(block, in, 
             s.getRemoteSocketAddress().toString(),
             s.getLocalSocketAddress().toString(),
-            stage, newGs, minBytesRcvd, maxBytesRcvd,
+            stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode);
       } else {
-        datanode.data.recoverClose(block, newGs, minBytesRcvd);
+        datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
       }
 
       //
@@ -380,9 +379,9 @@ class DataXceiver extends Receiver imple
                          SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
-          Sender.opWriteBlock(mirrorOut, originalBlock,
-              pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
-              srcDataNode, targets, blockToken);
+          new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
+              clientname, targets, srcDataNode, stage, pipelineSize,
+              minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
 
           if (blockReceiver != null) { // send checksum header
             blockReceiver.writeChecksumHeader(mirrorOut);
@@ -464,7 +463,7 @@ class DataXceiver extends Receiver imple
       // update its generation stamp
       if (isClient && 
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        block.setGenerationStamp(newGs);
+        block.setGenerationStamp(latestGenerationStamp);
         block.setNumBytes(minBytesRcvd);
       }
       
@@ -499,10 +498,10 @@ class DataXceiver extends Receiver imple
   }
 
   @Override
-  protected void opTransferBlock(final DataInputStream in,
-      final ExtendedBlock blk, final String client,
-      final DatanodeInfo[] targets,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void transferBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets) throws IOException {
     checkAccess(null, true, blk, blockToken,
         Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
 
@@ -511,19 +510,16 @@ class DataXceiver extends Receiver imple
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     try {
-      datanode.transferReplicaForPipelineRecovery(blk, targets, client);
+      datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
       writeResponse(Status.SUCCESS, out);
     } finally {
       IOUtils.closeStream(out);
     }
   }
   
-  /**
-   * Get block checksum (MD5 of CRC32).
-   */
   @Override
-  protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void blockChecksum(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     checkAccess(out, true, block, blockToken,
@@ -572,12 +568,9 @@ class DataXceiver extends Receiver imple
     datanode.metrics.addBlockChecksumOp(elapsed());
   }
 
-  /**
-   * Read a block from the disk and then sends it to a destination.
-   */
   @Override
-  protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void copyBlock(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
     updateCurrentThreadName("Copying block " + block);
     // Read in the header
     if (datanode.isBlockTokenEnabled) {
@@ -647,15 +640,12 @@ class DataXceiver extends Receiver imple
     datanode.metrics.addCopyBlockOp(elapsed());
   }
 
-  /**
-   * Receive a block and write it to disk, it then notifies the namenode to
-   * remove the copy from the source.
-   */
   @Override
-  protected void opReplaceBlock(DataInputStream in,
-      ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
-    updateCurrentThreadName("Replacing block " + block + " from " + sourceID);
+  public void replaceBlock(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String delHint,
+      final DatanodeInfo proxySource) throws IOException {
+    updateCurrentThreadName("Replacing block " + block + " from " + delHint);
 
     /* read header */
     block.setNumBytes(dataXceiverServer.estimateBlockSize);
@@ -699,7 +689,7 @@ class DataXceiver extends Receiver imple
                      new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
 
       /* send request to the proxy */
-      Sender.opCopyBlock(proxyOut, block, blockToken);
+      new Sender(proxyOut).copyBlock(block, blockToken);
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
@@ -727,7 +717,7 @@ class DataXceiver extends Receiver imple
           dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
-      datanode.notifyNamenodeReceivedBlock(block, sourceID);
+      datanode.notifyNamenodeReceivedBlock(block, delHint);
 
       LOG.info("Moved block " + block + 
           " from " + s.getRemoteSocketAddress());

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Jul  6 18:32:04 2011
@@ -28,12 +28,13 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 
 /**
@@ -128,22 +129,27 @@ class DataXceiverServer implements Runna
                    DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
   }
 
-  /**
-   */
+  @Override
   public void run() {
     while (datanode.shouldRun) {
       try {
         Socket s = ss.accept();
         s.setTcpNoDelay(true);
-        new Daemon(datanode.threadGroup, 
-            new DataXceiver(s, datanode, this)).start();
+        final DataXceiver exciver;
+        try {
+          exciver = new DataXceiver(s, datanode, this);
+        } catch(IOException e) {
+          IOUtils.closeSocket(s);
+          throw e;
+        }
+        new Daemon(datanode.threadGroup, exciver).start();
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run
       } catch (IOException ie) {
-        LOG.warn(datanode.getMachineName() + ":DataXceiveServer: ", ie);
+        LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ie);
       } catch (Throwable te) {
         LOG.error(datanode.getMachineName()
-            + ":DataXceiveServer: Exiting due to: ", te);
+            + ":DataXceiverServer: Exiting due to: ", te);
         datanode.shouldRun = false;
       }
     }
@@ -151,7 +157,7 @@ class DataXceiverServer implements Runna
       ss.close();
     } catch (IOException ie) {
       LOG.warn(datanode.getMachineName()
-          + ":DataXceiveServer: Close exception due to: ", ie);
+          + ":DataXceiverServer: Close exception due to: ", ie);
     }
   }
   
@@ -161,7 +167,7 @@ class DataXceiverServer implements Runna
     try {
       this.ss.close();
     } catch (IOException ie) {
-      LOG.warn(datanode.getMachineName() + ":DataXceiveServer.kill(): " 
+      LOG.warn(datanode.getMachineName() + ":DataXceiverServer.kill(): "
                               + StringUtils.stringifyException(ie));
     }
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Jul  6 18:32:04 2011
@@ -1150,7 +1150,7 @@ public class FSDataset implements FSCons
       conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
                   DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
 
-    String[] dataDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+    String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
 
     int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Wed Jul  6 18:32:04 2011
@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -27,13 +31,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import javax.management.JMX;
-import javax.management.MBeanServerConnection;
 import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,24 +41,27 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.util.StringUtils;
+import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 import org.znerd.xmlenc.XMLOutputter;
 
 /**
  * This class generates the data that is needed to be displayed on cluster web 
- * console by connecting to each namenode through JMX.
+ * console.
  */
 @InterfaceAudience.Private
 class ClusterJspHelper {
   private static final Log LOG = LogFactory.getLog(ClusterJspHelper.class);
   public static final String OVERALL_STATUS = "overall-status";
   public static final String DEAD = "Dead";
+  private static final String JMX_QRY = 
+    "/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo";
   
   /**
    * JSP helper function that generates cluster health report.  When 
    * encountering exception while getting Namenode status, the exception will 
-   * be listed in the page with corresponding stack trace.
+   * be listed on the page with corresponding stack trace.
    */
   ClusterStatus generateClusterHealthReport() {
     ClusterStatus cs = new ClusterStatus();
@@ -79,26 +80,24 @@ class ClusterJspHelper {
       NamenodeMXBeanHelper nnHelper = null;
       try {
         nnHelper = new NamenodeMXBeanHelper(isa, conf);
-        NamenodeStatus nn = nnHelper.getNamenodeStatus();
+        String mbeanProps= queryMbean(nnHelper.httpAddress, conf);
+        NamenodeStatus nn = nnHelper.getNamenodeStatus(mbeanProps);
         if (cs.clusterid.isEmpty() || cs.clusterid.equals("")) { // Set clusterid only once
-          cs.clusterid = nnHelper.getClusterId();
+          cs.clusterid = nnHelper.getClusterId(mbeanProps);
         }
         cs.addNamenodeStatus(nn);
       } catch ( Exception e ) {
         // track exceptions encountered when connecting to namenodes
         cs.addException(isa.getHostName(), e);
         continue;
-      } finally {
-        if (nnHelper != null) {
-          nnHelper.cleanup();
-        }
-      }
+      } 
     }
     return cs;
   }
 
   /**
-   * Helper function that generates the decommissioning report.
+   * Helper function that generates the decommissioning report.  Connect to each
+   * Namenode over http via JmxJsonServlet to collect the data nodes status.
    */
   DecommissionStatus generateDecommissioningReport() {
     String clusterid = "";
@@ -127,21 +126,18 @@ class ClusterJspHelper {
       NamenodeMXBeanHelper nnHelper = null;
       try {
         nnHelper = new NamenodeMXBeanHelper(isa, conf);
+        String mbeanProps= queryMbean(nnHelper.httpAddress, conf);
         if (clusterid.equals("")) {
-          clusterid = nnHelper.getClusterId();
+          clusterid = nnHelper.getClusterId(mbeanProps);
         }
-        nnHelper.getDecomNodeInfoForReport(statusMap);
+        nnHelper.getDecomNodeInfoForReport(statusMap, mbeanProps);
       } catch (Exception e) {
         // catch exceptions encountered while connecting to namenodes
         String nnHost = isa.getHostName();
         decommissionExceptions.put(nnHost, e);
         unreportedNamenode.add(nnHost);
         continue;
-      } finally {
-        if (nnHelper != null) {
-          nnHelper.cleanup();
-        }
-      }
+      } 
     }
     updateUnknownStatus(statusMap, unreportedNamenode);
     getDecommissionNodeClusterState(statusMap);
@@ -260,40 +256,20 @@ class ClusterJspHelper {
     }
     return Integer.parseInt(address.split(":")[1]);
   }
-
+  
   /**
-   * Class for connecting to Namenode over JMX and get attributes
-   * exposed by the MXBean.
+   * Class for connecting to Namenode over http via JmxJsonServlet 
+   * to get JMX attributes exposed by the MXBean.  
    */
   static class NamenodeMXBeanHelper {
     private static final ObjectMapper mapper = new ObjectMapper();
-    private final InetSocketAddress rpcAddress;
     private final String host;
-    private final Configuration conf;
-    private final JMXConnector connector;
-    private final NameNodeMXBean mxbeanProxy;
+    private final String httpAddress;
     
     NamenodeMXBeanHelper(InetSocketAddress addr, Configuration conf)
         throws IOException, MalformedObjectNameException {
-      this.rpcAddress = addr;
       this.host = addr.getHostName();
-      this.conf = conf;
-      int port = conf.getInt("dfs.namenode.jmxport", -1);
-      
-      JMXServiceURL jmxURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"
-          + host + ":" + port + "/jmxrmi");
-      connector = JMXConnectorFactory.connect(jmxURL);
-      mxbeanProxy = getNamenodeMxBean();
-    }
-    
-    private NameNodeMXBean getNamenodeMxBean()
-        throws IOException, MalformedObjectNameException {
-      // Get an MBeanServerConnection on the remote VM.
-      MBeanServerConnection remote = connector.getMBeanServerConnection();
-      ObjectName mxbeanName = new ObjectName(
-          "Hadoop:service=NameNode,name=NameNodeInfo");
-
-      return JMX.newMXBeanProxy(remote, mxbeanName, NameNodeMXBean.class);
+      this.httpAddress = DFSUtil.getInfoServer(addr, conf, false);
     }
     
     /** Get the map corresponding to the JSON string */
@@ -305,10 +281,9 @@ class ClusterJspHelper {
     }
     
     /**
-     * Process JSON string returned from JMX connection to get the number of
-     * live datanodes.
+     * Get the number of live datanodes.
      * 
-     * @param json JSON output from JMX call that contains live node status.
+     * @param json JSON string that contains live node status.
      * @param nn namenode status to return information in
      */
     private static void getLiveNodeCount(String json, NamenodeStatus nn)
@@ -333,11 +308,10 @@ class ClusterJspHelper {
     }
   
     /**
-     * Count the number of dead datanode based on the JSON string returned from
-     * JMX call.
+     * Count the number of dead datanode.
      * 
      * @param nn namenode
-     * @param json JSON string returned from JMX call
+     * @param json JSON string
      */
     private static void getDeadNodeCount(String json, NamenodeStatus nn)
         throws IOException {
@@ -358,51 +332,53 @@ class ClusterJspHelper {
       }
     }
   
-    public String getClusterId() {
-      return mxbeanProxy.getClusterId();
+    public String getClusterId(String props) throws IOException {
+      return getProperty(props, "ClusterId").getTextValue();
     }
     
-    public NamenodeStatus getNamenodeStatus()
-        throws IOException, MalformedObjectNameException {
+    public NamenodeStatus getNamenodeStatus(String props) throws IOException,
+        MalformedObjectNameException, NumberFormatException {
       NamenodeStatus nn = new NamenodeStatus();
       nn.host = host;
-      nn.filesAndDirectories = mxbeanProxy.getTotalFiles();
-      nn.capacity = mxbeanProxy.getTotal();
-      nn.free = mxbeanProxy.getFree();
-      nn.bpUsed = mxbeanProxy.getBlockPoolUsedSpace();
-      nn.nonDfsUsed = mxbeanProxy.getNonDfsUsedSpace();
-      nn.blocksCount = mxbeanProxy.getTotalBlocks();
-      nn.missingBlocksCount = mxbeanProxy.getNumberOfMissingBlocks();
-      nn.free = mxbeanProxy.getFree();
-      nn.httpAddress = DFSUtil.getInfoServer(rpcAddress, conf, false);
-      getLiveNodeCount(mxbeanProxy.getLiveNodes(), nn);
-      getDeadNodeCount(mxbeanProxy.getDeadNodes(), nn);
+      nn.filesAndDirectories = getProperty(props, "TotalFiles").getLongValue();
+      nn.capacity = getProperty(props, "Total").getLongValue();
+      nn.free = getProperty(props, "Free").getLongValue();
+      nn.bpUsed = getProperty(props, "BlockPoolUsedSpace").getLongValue();
+      nn.nonDfsUsed = getProperty(props, "NonDfsUsedSpace").getLongValue();
+      nn.blocksCount = getProperty(props, "TotalBlocks").getLongValue();
+      nn.missingBlocksCount = getProperty(props, "NumberOfMissingBlocks")
+          .getLongValue();
+      nn.httpAddress = httpAddress;
+      getLiveNodeCount(getProperty(props, "LiveNodes").getValueAsText(), nn);
+      getDeadNodeCount(getProperty(props, "DeadNodes").getValueAsText(), nn);
       return nn;
     }
     
     /**
-     * Connect to namenode to get decommission node information.
+     * Get the decommission node information.
      * @param statusMap data node status map
-     * @param connector JMXConnector
+     * @param props string
      */
     private void getDecomNodeInfoForReport(
-        Map<String, Map<String, String>> statusMap) throws IOException,
-        MalformedObjectNameException {
-      getLiveNodeStatus(statusMap, host, mxbeanProxy.getLiveNodes());
-      getDeadNodeStatus(statusMap, host, mxbeanProxy.getDeadNodes());
-      getDecommissionNodeStatus(statusMap, host, mxbeanProxy.getDecomNodes());
+        Map<String, Map<String, String>> statusMap, String props)
+        throws IOException, MalformedObjectNameException {
+      getLiveNodeStatus(statusMap, host, getProperty(props, "LiveNodes")
+          .getValueAsText());
+      getDeadNodeStatus(statusMap, host, getProperty(props, "DeadNodes")
+          .getValueAsText());
+      getDecommissionNodeStatus(statusMap, host,
+          getProperty(props, "DecomNodes").getValueAsText());
     }
   
     /**
-     * Process the JSON string returned from JMX call to get live datanode
-     * status. Store the information into datanode status map and
-     * Decommissionnode.
+     * Store the live datanode status information into datanode status map and
+     * DecommissionNode.
      * 
      * @param statusMap Map of datanode status. Key is datanode, value
      *          is an inner map whose key is namenode, value is datanode status.
      *          reported by each namenode.
      * @param namenodeHost host name of the namenode
-     * @param decomnode update Decommissionnode with alive node status
+     * @param decomnode update DecommissionNode with alive node status
      * @param json JSON string contains datanode status
      * @throws IOException
      */
@@ -434,15 +410,14 @@ class ClusterJspHelper {
     }
   
     /**
-     * Process the JSON string returned from JMX connection to get the dead
-     * datanode information. Store the information into datanode status map and
-     * Decommissionnode.
+     * Store the dead datanode information into datanode status map and
+     * DecommissionNode.
      * 
      * @param statusMap map with key being datanode, value being an
      *          inner map (key:namenode, value:decommisionning state).
      * @param host datanode hostname
-     * @param decomnode
-     * @param json
+     * @param decomnode DecommissionNode
+     * @param json String
      * @throws IOException
      */
     private static void getDeadNodeStatus(
@@ -478,14 +453,13 @@ class ClusterJspHelper {
     }
   
     /**
-     * We process the JSON string returned from JMX connection to get the
-     * decommisioning datanode information.
+     * Get the decommisioning datanode information.
      * 
      * @param dataNodeStatusMap map with key being datanode, value being an
      *          inner map (key:namenode, value:decommisionning state).
      * @param host datanode
-     * @param decomnode Decommissionnode
-     * @param json JSON string returned from JMX connection
+     * @param decomnode DecommissionNode
+     * @param json String
      */
     private static void getDecommissionNodeStatus(
         Map<String, Map<String, String>> dataNodeStatusMap, String host,
@@ -508,19 +482,6 @@ class ClusterJspHelper {
         dataNodeStatusMap.put(dn, nnStatus);
       }
     }
-  
-    
-    public void cleanup() {
-      if (connector != null) {
-        try {
-          connector.close();
-        } catch (Exception e) {
-          // log failure of close jmx connection
-          LOG.warn("Unable to close JMX connection. "
-              + StringUtils.stringifyException(e));
-        }
-      }
-    }
   }
 
   /**
@@ -893,4 +854,47 @@ class ClusterJspHelper {
     doc.endTag(); // message
     doc.endTag(); // cluster
   }
+  
+  /**
+   * Read in the content from a URL
+   * @param url URL To read
+   * @return the text from the output
+   * @throws IOException if something went wrong
+   */
+  private static String readOutput(URL url) throws IOException {
+    StringBuilder out = new StringBuilder();
+    URLConnection connection = url.openConnection();
+    BufferedReader in = new BufferedReader(
+                            new InputStreamReader(
+                            connection.getInputStream()));
+    String inputLine;
+    while ((inputLine = in.readLine()) != null) {
+      out.append(inputLine);
+    }
+    in.close();
+    return out.toString();
+  }
+
+  private static String queryMbean(String httpAddress, Configuration conf) 
+    throws IOException {
+    URL url = new URL("http://"+httpAddress+JMX_QRY);
+    return readOutput(url);
+  }
+  /**
+   * In order to query a namenode mxbean, a http connection in the form of
+   * "http://hostname/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"
+   * is sent to namenode.  JMX attributes are exposed via JmxJsonServelet on 
+   * the namenode side.
+   */
+  private static JsonNode getProperty(String props, String propertyname)
+  throws IOException {
+    if (props == null || props.equals("") || propertyname == null 
+        || propertyname.equals("")) {
+      return null;
+    }
+    ObjectMapper m = new ObjectMapper();
+    JsonNode rootNode = m.readValue(props, JsonNode.class);
+    JsonNode jn = rootNode.get("beans").get(0).get(propertyname);
+    return jn;
+  }
 } 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java Wed Jul  6 18:32:04 2011
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.util.CyclicIteration;
 
 /**

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Jul  6 18:32:04 2011
@@ -154,19 +154,31 @@ class EditLogFileOutputStream extends Ed
     
     // close should have been called after all pending transactions
     // have been flushed & synced.
-    int bufSize = bufCurrent.size();
-    if (bufSize != 0) {
-      throw new IOException("FSEditStream has " + bufSize
-          + " bytes still to be flushed and cannot " + "be closed.");
+    // if already closed, just skip
+    if(bufCurrent != null)
+    {
+      int bufSize = bufCurrent.size();
+      if (bufSize != 0) {
+        throw new IOException("FSEditStream has " + bufSize
+            + " bytes still to be flushed and cannot " + "be closed.");
+      }
+      bufCurrent.close();
+      bufCurrent = null;
     }
-    bufCurrent.close();
-    bufReady.close();
 
-    // remove the last INVALID marker from transaction log.
-    fc.truncate(fc.position());
-    fp.close();
+    if(bufReady != null) {
+      bufReady.close();
+      bufReady = null;
+    }
 
-    bufCurrent = bufReady = null;
+    // remove the last INVALID marker from transaction log.
+    if (fc != null && fc.isOpen()) {
+      fc.truncate(fc.position());
+      fc.close();
+    }
+    if (fp != null) {
+      fp.close();
+    }
     fp = null;
   }
   

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Jul  6 18:32:04 2011
@@ -17,40 +17,47 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSLimitException;
-import org.apache.hadoop.hdfs.protocol.FSLimitException.*;
+import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
+import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.util.ByteArray;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 /*************************************************
  * FSDirectory stores the filesystem directory state.
@@ -61,7 +68,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
  * and logged to disk.
  * 
  *************************************************/
-class FSDirectory implements Closeable {
+public class FSDirectory implements Closeable {
 
   INodeDirectoryWithQuota rootDir;
   FSImage fsImage;  
@@ -1337,7 +1344,7 @@ class FSDirectory implements Closeable {
    * @throws QuotaExceededException if the new count violates any quota limit
    * @throws FileNotFound if path does not exist.
    */
-  void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
+  public void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
                                          throws QuotaExceededException,
                                                 FileNotFoundException,
                                                 UnresolvedLinkException {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Jul  6 18:32:04 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
@@ -33,12 +35,32 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Reader;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 public class FSEditLogLoader {
   private final FSNamesystem fsNamesys;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Wed Jul  6 18:32:04 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.Storage;
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Jul  6 18:32:04 2011
@@ -414,7 +414,12 @@ public class FSImage implements Closeabl
       LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
     }
     isUpgradeFinalized = false;
-    storage.reportErrorsOnDirectories(errorSDs);
+    if (!errorSDs.isEmpty()) {
+      storage.reportErrorsOnDirectories(errorSDs);
+      //during upgrade, it's a fatal error to fail any storage directory
+      throw new IOException("Upgrade failed in " + errorSDs.size()
+          + " storage directory(ies), previously logged.");
+    }
     storage.initializeDistributedUpgrade();
   }
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Jul  6 18:32:04 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.io.MD5Hash;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Wed Jul  6 18:32:04 2011
@@ -22,7 +22,6 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -33,6 +32,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Jul  6 18:32:04 2011
@@ -31,6 +31,7 @@ import java.io.PrintWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -96,15 +97,20 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@@ -235,7 +241,7 @@ public class FSNamesystem implements FSC
   // Stores the correct file name hierarchy
   //
   public FSDirectory dir;
-  BlockManager blockManager;
+  public BlockManager blockManager;
   
   // Block pool ID used by this namenode
   String blockPoolId;
@@ -270,10 +276,10 @@ public class FSNamesystem implements FSC
    * Stores a set of DatanodeDescriptor objects.
    * This is a subset of {@link #datanodeMap}, containing nodes that are 
    * considered alive.
-   * The {@link HeartbeatMonitor} periodically checks for outdated entries,
+   * The HeartbeatMonitor periodically checks for out-dated entries,
    * and removes them from the list.
    */
-  ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
+  public ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
 
   public LeaseManager leaseManager = new LeaseManager(this); 
 
@@ -314,8 +320,8 @@ public class FSNamesystem implements FSC
   private volatile SafeModeInfo safeMode;  // safe mode information
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
     
-  // datanode networktoplogy
-  NetworkTopology clusterMap = new NetworkTopology();
+  /** datanode network toplogy */
+  public NetworkTopology clusterMap = new NetworkTopology();
   private DNSToSwitchMapping dnsToSwitchMapping;
 
   private HostsFileReader hostsReader; 
@@ -329,7 +335,7 @@ public class FSNamesystem implements FSC
   private final GenerationStamp generationStamp = new GenerationStamp();
 
   // Ask Datanode only up to this many blocks to delete.
-  int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
+  public int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
 
   // precision of access times.
   private long accessTimePrecision = 0;
@@ -472,23 +478,23 @@ public class FSNamesystem implements FSC
   }
 
   // utility methods to acquire and release read lock and write lock
-  void readLock() {
+  public void readLock() {
     this.fsLock.readLock().lock();
   }
 
-  void readUnlock() {
+  public void readUnlock() {
     this.fsLock.readLock().unlock();
   }
 
-  void writeLock() {
+  public void writeLock() {
     this.fsLock.writeLock().lock();
   }
 
-  void writeUnlock() {
+  public void writeUnlock() {
     this.fsLock.writeLock().unlock();
   }
 
-  boolean hasWriteLock() {
+  public boolean hasWriteLock() {
     return this.fsLock.isWriteLockedByCurrentThread();
   }
 
@@ -1014,7 +1020,7 @@ public class FSNamesystem implements FSC
   }
 
   /** Create a LocatedBlock. */
-  LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+  public LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
       final long offset, final boolean corrupt) throws IOException {
     return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
   }
@@ -3013,7 +3019,7 @@ public class FSNamesystem implements FSC
    * @return an array of datanode commands 
    * @throws IOException
    */
-  DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
+  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes) 
         throws IOException {
@@ -3521,7 +3527,7 @@ public class FSNamesystem implements FSC
    * If no such a node is available,
    * then pick a node with least free space
    */
-  void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
+  public void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
                               Block b, short replication,
                               DatanodeDescriptor addedNode,
                               DatanodeDescriptor delNodeHint,
@@ -3785,9 +3791,19 @@ public class FSNamesystem implements FSC
             nodes.add(dn);
           }
           //Remove any form of the this datanode in include/exclude lists.
-          mustList.remove(dn.getName());
-          mustList.remove(dn.getHost());
-          mustList.remove(dn.getHostName());
+          try {
+            InetAddress inet = InetAddress.getByName(dn.getHost());
+            // compare hostname(:port)
+            mustList.remove(inet.getHostName());
+            mustList.remove(inet.getHostName()+":"+dn.getPort());
+            // compare ipaddress(:port)
+            mustList.remove(inet.getHostAddress().toString());
+            mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
+          } catch ( UnknownHostException e ) {
+            mustList.remove(dn.getName());
+            mustList.remove(dn.getHost());
+            LOG.warn(e);
+          }
         }
       }
       
@@ -3970,45 +3986,6 @@ public class FSNamesystem implements FSC
   }
     
   /**
-   * A immutable object that stores the number of live replicas and
-   * the number of decommissined Replicas.
-   */
-  static class NumberReplicas {
-    private int liveReplicas;
-    int decommissionedReplicas;
-    private int corruptReplicas;
-    private int excessReplicas;
-
-    NumberReplicas() {
-      initialize(0, 0, 0, 0);
-    }
-
-    NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
-      initialize(live, decommissioned, corrupt, excess);
-    }
-
-    void initialize(int live, int decommissioned, int corrupt, int excess) {
-      liveReplicas = live;
-      decommissionedReplicas = decommissioned;
-      corruptReplicas = corrupt;
-      excessReplicas = excess;
-    }
-
-    int liveReplicas() {
-      return liveReplicas;
-    }
-    int decommissionedReplicas() {
-      return decommissionedReplicas;
-    }
-    int corruptReplicas() {
-      return corruptReplicas;
-    }
-    int excessReplicas() {
-      return excessReplicas;
-    }
-  } 
-
-  /**
    * Change, if appropriate, the admin state of a datanode to 
    * decommission completed. Return true if decommission is complete.
    */
@@ -4032,23 +4009,62 @@ public class FSNamesystem implements FSC
    */
   private boolean inHostsList(DatanodeID node, String ipAddr) {
     Set<String> hostsList = hostsReader.getHosts();
-    return (hostsList.isEmpty() || 
-            (ipAddr != null && hostsList.contains(ipAddr)) ||
-            hostsList.contains(node.getHost()) ||
-            hostsList.contains(node.getName()) || 
-            ((node instanceof DatanodeInfo) && 
-             hostsList.contains(((DatanodeInfo)node).getHostName())));
+     return checkInList(node, ipAddr, hostsList, false);
   }
   
   private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
     Set<String> excludeList = hostsReader.getExcludedHosts();
-    return  ((ipAddr != null && excludeList.contains(ipAddr)) ||
-            excludeList.contains(node.getHost()) ||
-            excludeList.contains(node.getName()) ||
-            ((node instanceof DatanodeInfo) && 
-             excludeList.contains(((DatanodeInfo)node).getHostName())));
+    return checkInList(node, ipAddr, excludeList, true);
   }
 
+
+  /**
+   * Check if the given node (of DatanodeID or ipAddress) is in the (include or 
+   * exclude) list.  If ipAddress in null, check only based upon the given 
+   * DatanodeID.  If ipAddress is not null, the ipAddress should refers to the
+   * same host that given DatanodeID refers to.
+   * 
+   * @param node, DatanodeID, the host DatanodeID
+   * @param ipAddress, if not null, should refers to the same host
+   *                   that DatanodeID refers to
+   * @param hostsList, the list of hosts in the include/exclude file
+   * @param isExcludeList, boolean, true if this is the exclude list
+   * @return boolean, if in the list
+   */
+  private boolean checkInList(DatanodeID node, String ipAddress,
+      Set<String> hostsList, boolean isExcludeList) {
+    InetAddress iaddr = null;
+    try {
+      if (ipAddress != null) {
+        iaddr = InetAddress.getByName(ipAddress);
+      } else {
+        iaddr = InetAddress.getByName(node.getHost());
+      }
+    }catch (UnknownHostException e) {
+      LOG.warn("Unknown host in host list: "+ipAddress);
+      // can't resolve the host name.
+      if (isExcludeList){
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    // if include list is empty, host is in include list
+    if ( (!isExcludeList) && (hostsList.isEmpty()) ){
+      return true;
+    }
+    return // compare ipaddress(:port)
+    (hostsList.contains(iaddr.getHostAddress().toString()))
+        || (hostsList.contains(iaddr.getHostAddress().toString() + ":"
+            + node.getPort()))
+        // compare hostname(:port)
+        || (hostsList.contains(iaddr.getHostName()))
+        || (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
+        || ((node instanceof DatanodeInfo) && hostsList
+            .contains(((DatanodeInfo) node).getHostName()));
+  }
+  
   /**
    * Rereads the config to get hosts and exclude list file names.
    * Rereads the files to update the hosts and exclude lists.  It
@@ -4626,7 +4642,7 @@ public class FSNamesystem implements FSC
    * Check whether the name node is in safe mode.
    * @return true if safe mode is ON, false otherwise
    */
-  boolean isInSafeMode() {
+  public boolean isInSafeMode() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -4637,7 +4653,7 @@ public class FSNamesystem implements FSC
   /**
    * Check whether the name node is in startup mode.
    */
-  boolean isInStartupSafeMode() {
+  public boolean isInStartupSafeMode() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -4648,7 +4664,7 @@ public class FSNamesystem implements FSC
   /**
    * Check whether replication queues are populated.
    */
-  boolean isPopulatingReplQueues() {
+  public boolean isPopulatingReplQueues() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -4660,7 +4676,7 @@ public class FSNamesystem implements FSC
    * Increment number of blocks that reached minimal replication.
    * @param replication current replication 
    */
-  void incrementSafeBlockCount(int replication) {
+  public void incrementSafeBlockCount(int replication) {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
@@ -4671,7 +4687,7 @@ public class FSNamesystem implements FSC
   /**
    * Decrement number of blocks that reached minimal replication.
    */
-  void decrementSafeBlockCount(Block b) {
+  public void decrementSafeBlockCount(Block b) {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null) // mostly true
@@ -4978,13 +4994,13 @@ public class FSNamesystem implements FSC
   @Override // FSNamesystemMBean
   @Metric
   public long getPendingReplicationBlocks() {
-    return blockManager.pendingReplicationBlocksCount;
+    return blockManager.getPendingReplicationBlocksCount();
   }
 
   @Override // FSNamesystemMBean
   @Metric
   public long getUnderReplicatedBlocks() {
-    return blockManager.underReplicatedBlocksCount;
+    return blockManager.getUnderReplicatedBlocksCount();
   }
 
   /** Return number of under-replicated but not missing blocks */
@@ -4995,23 +5011,23 @@ public class FSNamesystem implements FSC
   /** Returns number of blocks with corrupt replicas */
   @Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
   public long getCorruptReplicaBlocks() {
-    return blockManager.corruptReplicaBlocksCount;
+    return blockManager.getCorruptReplicaBlocksCount();
   }
 
   @Override // FSNamesystemMBean
   @Metric
   public long getScheduledReplicationBlocks() {
-    return blockManager.scheduledReplicationBlocksCount;
+    return blockManager.getScheduledReplicationBlocksCount();
   }
 
   @Metric
   public long getPendingDeletionBlocks() {
-    return blockManager.pendingDeletionBlocksCount;
+    return blockManager.getPendingDeletionBlocksCount();
   }
 
   @Metric
   public long getExcessBlocks() {
-    return blockManager.excessBlocksCount;
+    return blockManager.getExcessBlocksCount();
   }
   
   @Metric
@@ -5380,7 +5396,7 @@ public class FSNamesystem implements FSC
   }
 
   /** Get a datanode descriptor given corresponding storageID */
-  DatanodeDescriptor getDatanode(String nodeID) {
+  public DatanodeDescriptor getDatanode(String nodeID) {
     assert hasReadOrWriteLock();
     return datanodeMap.get(nodeID);
   }
@@ -5444,7 +5460,7 @@ public class FSNamesystem implements FSC
       if (startBlockAfter != null) {
         startBlockId = Block.filename2id(startBlockAfter);
       }
-      BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
+      UnderReplicatedBlocks.BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
       while (blkIterator.hasNext()) {
         Block blk = blkIterator.next();
         INode inode = blockManager.getINode(blk);

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Wed Jul  6 18:32:04 2011
@@ -117,7 +117,7 @@ public class FileDataServlet extends Dfs
               .getParameter(JspHelper.DELEGATION_PARAMETER_NAME);
 
           HdfsFileStatus info = nn.getFileInfo(path);
-          if ((info != null) && !info.isDir()) {
+          if (info != null && !info.isDir()) {
             try {
               response.sendRedirect(createUri(path, info, ugi, nn, request,
                   delegationToken).toURL().toString());

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java Wed Jul  6 18:32:04 2011
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Random;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+
 class Host2NodesMap {
   private HashMap<String, DatanodeDescriptor[]> map
     = new HashMap<String, DatanodeDescriptor[]>();

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Jul  6 18:32:04 2011
@@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs.server.na
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -32,7 +34,7 @@ import org.apache.hadoop.util.StringUtil
  * This is a base INode class containing common fields for file and 
  * directory inodes.
  */
-abstract class INode implements Comparable<byte[]>, FSInodeInfo {
+public abstract class INode implements Comparable<byte[]>, FSInodeInfo {
   /*
    *  The inode name is in java UTF8 encoding; 
    *  The name in HdfsFileStatus should keep the same encoding as this.
@@ -324,7 +326,7 @@ abstract class INode implements Comparab
   /**
    * Is this inode being constructed?
    */
-  boolean isUnderConstruction() {
+  public boolean isUnderConstruction() {
     return false;
   }
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Jul  6 18:32:04 2011
@@ -409,20 +409,31 @@ class INodeDirectory extends INode {
 
   /** {@inheritDoc} */
   long[] computeContentSummary(long[] summary) {
+    // Walk through the children of this node, using a new summary array
+    // for the (sub)tree rooted at this node
+    assert 4 == summary.length;
+    long[] subtreeSummary = new long[]{0,0,0,0};
     if (children != null) {
       for (INode child : children) {
-        child.computeContentSummary(summary);
+        child.computeContentSummary(subtreeSummary);
       }
     }
     if (this instanceof INodeDirectoryWithQuota) {
       // Warn if the cached and computed diskspace values differ
       INodeDirectoryWithQuota node = (INodeDirectoryWithQuota)this;
       long space = node.diskspaceConsumed();
-      if (-1 != node.getDsQuota() && space != summary[3]) {
+      assert -1 == node.getDsQuota() || space == subtreeSummary[3];
+      if (-1 != node.getDsQuota() && space != subtreeSummary[3]) {
         NameNode.LOG.warn("Inconsistent diskspace for directory "
-            +getLocalName()+". Cached: "+space+" Computed: "+summary[3]);
+            +getLocalName()+". Cached: "+space+" Computed: "+subtreeSummary[3]);
       }
     }
+
+    // update the passed summary array with the values for this node's subtree
+    for (int i = 0; i < summary.length; i++) {
+      summary[i] += subtreeSummary[i];
+    }
+
     summary[2]++;
     return summary;
   }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Jul  6 18:32:04 2011
@@ -24,8 +24,11 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 
-class INodeFile extends INode {
+/** I-node for closed file. */
+public class INodeFile extends INode {
   static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
 
   //Number of bits for Block size
@@ -106,7 +109,7 @@ class INodeFile extends INode {
    * Get file blocks 
    * @return file blocks
    */
-  BlockInfo[] getBlocks() {
+  public BlockInfo[] getBlocks() {
     return this.blocks;
   }
 
@@ -149,7 +152,7 @@ class INodeFile extends INode {
   /**
    * Set file block
    */
-  void setBlock(int idx, BlockInfo blk) {
+  public void setBlock(int idx, BlockInfo blk) {
     this.blocks[idx] = blk;
   }
 
@@ -237,7 +240,7 @@ class INodeFile extends INode {
    * Get the last block of the file.
    * Make sure it has the right type.
    */
-  <T extends BlockInfo> T getLastBlock() throws IOException {
+  public <T extends BlockInfo> T getLastBlock() throws IOException {
     if (blocks == null || blocks.length == 0)
       return null;
     T returnBlock = null;
@@ -252,7 +255,8 @@ class INodeFile extends INode {
     return returnBlock;
   }
 
-  int numBlocks() {
+  /** @return the number of blocks */ 
+  public int numBlocks() {
     return blocks == null ? 0 : blocks.length;
   }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Wed Jul  6 18:32:04 2011
@@ -21,10 +21,15 @@ import java.io.IOException;
 
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 
-
-class INodeFileUnderConstruction extends INodeFile {
+/**
+ * I-node for file being written.
+ */
+public class INodeFileUnderConstruction extends INodeFile {
   private  String clientName;         // lease holder
   private final String clientMachine;
   private final DatanodeDescriptor clientNode; // if client is a cluster node too.
@@ -43,7 +48,7 @@ class INodeFileUnderConstruction extends
     this.clientNode = clientNode;
   }
 
-  public INodeFileUnderConstruction(byte[] name,
+  INodeFileUnderConstruction(byte[] name,
                              short blockReplication,
                              long modificationTime,
                              long preferredBlockSize,
@@ -80,7 +85,7 @@ class INodeFileUnderConstruction extends
    * Is this inode being constructed?
    */
   @Override
-  boolean isUnderConstruction() {
+  public boolean isUnderConstruction() {
     return true;
   }
 
@@ -122,7 +127,7 @@ class INodeFileUnderConstruction extends
    * Convert the last block of the file to an under-construction block.
    * Set its locations.
    */
-  BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
+  public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
                                           DatanodeDescriptor[] targets)
   throws IOException {
     if (blocks == null || blocks.length == 0) {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Wed Jul  6 18:32:04 2011
@@ -163,7 +163,7 @@ public class LeaseManager {
   /**
    * Finds the pathname for the specified pendingFile
    */
-  synchronized String findPath(INodeFileUnderConstruction pendingFile)
+  public synchronized String findPath(INodeFileUnderConstruction pendingFile)
       throws IOException {
     Lease lease = getLease(pendingFile.getClientName());
     if (lease != null) {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Wed Jul  6 18:32:04 2011
@@ -464,6 +464,7 @@ public class NNStorage extends Storage i
         // Close any edits stream associated with this dir and remove directory
         LOG.warn("writeTransactionIdToStorage failed on " + sd,
             e);
+        reportErrorsOnDirectory(sd);
       }
     }
   }
@@ -828,17 +829,17 @@ public class NNStorage extends Storage i
    * @throws IOException
    */
   void reportErrorsOnDirectory(StorageDirectory sd) {
-    LOG.warn("Error reported on storage directory " + sd);
+    LOG.error("Error reported on storage directory " + sd);
 
     String lsd = listStorageDirectories();
     LOG.debug("current list of storage dirs:" + lsd);
 
-    LOG.info("About to remove corresponding storage: "
+    LOG.warn("About to remove corresponding storage: "
              + sd.getRoot().getAbsolutePath());
     try {
       sd.unlock();
     } catch (Exception e) {
-      LOG.info("Unable to unlock bad storage directory: "
+      LOG.warn("Unable to unlock bad storage directory: "
                +  sd.getRoot().getPath(), e);
     }
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Jul  6 18:32:04 2011
@@ -248,7 +248,7 @@ public class NameNode implements Namenod
   /** Return the {@link FSNamesystem} object.
    * @return {@link FSNamesystem} object.
    */
-  FSNamesystem getNamesystem() {
+  public FSNamesystem getNamesystem() {
     return namesystem;
   }
 
@@ -439,6 +439,13 @@ public class NameNode implements Namenod
     // The rpc-server port can be ephemeral... ensure we have the correct info
     this.rpcAddress = this.server.getListenerAddress(); 
     setRpcServerAddress(conf);
+    
+    try {
+      validateConfigurationSettings(conf);
+    } catch (IOException e) {
+      LOG.fatal(e.toString());
+      throw e;
+    }
 
     activate(conf);
     LOG.info(getRole() + " up at: " + rpcAddress);
@@ -446,6 +453,28 @@ public class NameNode implements Namenod
       LOG.info(getRole() + " service server is up at: " + serviceRPCAddress); 
     }
   }
+  
+  /**
+   * Verifies that the final Configuration Settings look ok for the NameNode to
+   * properly start up
+   * Things to check for include:
+   * - HTTP Server Port does not equal the RPC Server Port
+   * @param conf
+   * @throws IOException
+   */
+  protected void validateConfigurationSettings(final Configuration conf) 
+      throws IOException {
+    // check to make sure the web port and rpc port do not match 
+    if(getHttpServerAddress(conf).getPort() 
+        == getRpcServerAddress(conf).getPort()) {
+      String errMsg = "dfs.namenode.rpc-address " +
+          "("+ getRpcServerAddress(conf) + ") and " +
+          "dfs.namenode.http-address ("+ getHttpServerAddress(conf) + ") " +
+          "configuration keys are bound to the same port, unable to start " +
+          "NameNode. Port: " + getRpcServerAddress(conf).getPort();
+      throw new IOException(errMsg);
+    } 
+  }
 
   /**
    * Activate name-node servers and threads.

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Jul  6 18:32:04 2011
@@ -36,20 +36,20 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Wed Jul  6 18:32:04 2011
@@ -36,24 +36,24 @@ import javax.servlet.http.HttpServletRes
 import javax.servlet.jsp.JspWriter;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ServletUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-
-import org.znerd.xmlenc.*;
+import org.znerd.xmlenc.XMLOutputter;
 
 class NamenodeJspHelper {
   static String getSafeModeText(FSNamesystem fsn) {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Wed Jul  6 18:32:04 2011
@@ -253,7 +253,7 @@ public class SecondaryNameNode implement
             infoServer.addSslListener(secInfoSocAddr, conf, false, true);
           }
           
-          infoServer.setAttribute("secondary.name.node", this);
+          infoServer.setAttribute("secondary.name.node", SecondaryNameNode.this);
           infoServer.setAttribute("name.system.image", checkpointImage);
           infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
           infoServer.addInternalServlet("getimage", "/getimage",

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Wed Jul  6 18:32:04 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.DFSInputSt
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.mortbay.jetty.InclusiveByteRange;
 
@@ -46,7 +47,7 @@ public class StreamFile extends DfsServl
 
   public static final String CONTENT_LENGTH = "Content-Length";
 
-  /** getting a client for connecting to dfs */
+  /* Return a DFS client to use to make the given HTTP request */
   protected DFSClient getDFSClient(HttpServletRequest request)
       throws IOException, InterruptedException {
     final Configuration conf =
@@ -57,6 +58,7 @@ public class StreamFile extends DfsServl
     return DatanodeJspHelper.getDFSClient(request, datanode, conf, ugi);
   }
   
+  @SuppressWarnings("unchecked")
   public void doGet(HttpServletRequest request, HttpServletResponse response)
     throws ServletException, IOException {
     final String path = request.getPathInfo() != null ? 
@@ -69,9 +71,10 @@ public class StreamFile extends DfsServl
       return;
     }
     
-    Enumeration<?> reqRanges = request.getHeaders("Range");
-    if (reqRanges != null && !reqRanges.hasMoreElements())
+    Enumeration<String> reqRanges = request.getHeaders("Range");
+    if (reqRanges != null && !reqRanges.hasMoreElements()) {
       reqRanges = null;
+    }
 
     DFSClient dfs;
     try {
@@ -81,107 +84,82 @@ public class StreamFile extends DfsServl
       return;
     }
     
-    final DFSInputStream in = dfs.open(filename);
-    final long fileLen = in.getFileLength();
-    OutputStream os = response.getOutputStream();
+    DFSInputStream in = null;
+    OutputStream out = null;
 
     try {
+      in = dfs.open(filename);
+      out = response.getOutputStream();
+      final long fileLen = in.getFileLength();
       if (reqRanges != null) {
-        List<?> ranges = InclusiveByteRange.satisfiableRanges(reqRanges,
-                                                           fileLen);
-        StreamFile.sendPartialData(in, os, response, fileLen, ranges);
+        List<InclusiveByteRange> ranges = 
+          InclusiveByteRange.satisfiableRanges(reqRanges, fileLen);
+        StreamFile.sendPartialData(in, out, response, fileLen, ranges);
       } else {
         // No ranges, so send entire file
         response.setHeader("Content-Disposition", "attachment; filename=\"" + 
                            filename + "\"");
         response.setContentType("application/octet-stream");
         response.setHeader(CONTENT_LENGTH, "" + fileLen);
-        StreamFile.writeTo(in, os, 0L, fileLen);
+        StreamFile.copyFromOffset(in, out, 0L, fileLen);
       }
-    } catch(IOException e) {
+      in.close();
+      in = null;
+      out.close();
+      out = null;
+      dfs.close();
+      dfs = null;
+    } catch (IOException ioe) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("response.isCommitted()=" + response.isCommitted(), e);
+        LOG.debug("response.isCommitted()=" + response.isCommitted(), ioe);
       }
-      throw e;
+      throw ioe;
     } finally {
-      try {
-        in.close();
-        os.close();
-      } finally {
-        dfs.close();
-      }
-    }      
+      IOUtils.cleanup(LOG, in);
+      IOUtils.cleanup(LOG, out);
+      IOUtils.cleanup(LOG, dfs);
+    }
   }
   
+  /**
+   * Send a partial content response with the given range. If there are
+   * no satisfiable ranges, or if multiple ranges are requested, which
+   * is unsupported, respond with range not satisfiable.
+   *
+   * @param in stream to read from
+   * @param out stream to write to
+   * @param response http response to use
+   * @param contentLength for the response header
+   * @param ranges to write to respond with
+   * @throws IOException on error sending the response
+   */
   static void sendPartialData(FSInputStream in,
-                              OutputStream os,
+                              OutputStream out,
                               HttpServletResponse response,
                               long contentLength,
-                              List<?> ranges)
-  throws IOException {
-
+                              List<InclusiveByteRange> ranges)
+      throws IOException {
     if (ranges == null || ranges.size() != 1) {
-      //  if there are no satisfiable ranges, or if multiple ranges are
-      // requested (we don't support multiple range requests), send 416 response
       response.setContentLength(0);
-      int status = HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE;
-      response.setStatus(status);
-      response.setHeader("Content-Range", 
+      response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE);
+      response.setHeader("Content-Range",
                 InclusiveByteRange.to416HeaderRangeString(contentLength));
     } else {
-      //  if there is only a single valid range (must be satisfiable 
-      //  since were here now), send that range with a 206 response
-      InclusiveByteRange singleSatisfiableRange =
-        (InclusiveByteRange)ranges.get(0);
+      InclusiveByteRange singleSatisfiableRange = ranges.get(0);
       long singleLength = singleSatisfiableRange.getSize(contentLength);
       response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
       response.setHeader("Content-Range", 
         singleSatisfiableRange.toHeaderRangeString(contentLength));
-      System.out.println("first: "+singleSatisfiableRange.getFirst(contentLength));
-      System.out.println("singleLength: "+singleLength);
-      
-      StreamFile.writeTo(in,
-                         os,
-                         singleSatisfiableRange.getFirst(contentLength),
-                         singleLength);
+      copyFromOffset(in, out,
+                     singleSatisfiableRange.getFirst(contentLength),
+                     singleLength);
     }
   }
-  
-  static void writeTo(FSInputStream in,
-                      OutputStream os,
-                      long start,
-                      long count) 
-  throws IOException {
-    byte buf[] = new byte[4096];
-    long bytesRemaining = count;
-    int bytesRead;
-    int bytesToRead;
-
-    in.seek(start);
-
-    while (true) {
-      // number of bytes to read this iteration
-      bytesToRead = (int)(bytesRemaining<buf.length ? 
-                                                      bytesRemaining:
-                                                      buf.length);
-      
-      // number of bytes actually read this iteration
-      bytesRead = in.read(buf, 0, bytesToRead);
-
-      // if we can't read anymore, break
-      if (bytesRead == -1) {
-        break;
-      } 
-      
-      os.write(buf, 0, bytesRead);
-
-      bytesRemaining -= bytesRead;
-
-      // if we don't need to read anymore, break
-      if (bytesRemaining <= 0) {
-        break;
-      }
 
-    } 
+  /* Copy count bytes at the given offset from one stream to another */
+  static void copyFromOffset(FSInputStream in, OutputStream out, long offset,
+      long count) throws IOException {
+    in.seek(offset);
+    IOUtils.copyBytes(in, out, count, false);
   }
 }



Mime
View raw message