hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885143 [14/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...
Date Sat, 28 Nov 2009 20:06:08 GMT
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java Sat Nov 28 20:05:56 2009
@@ -18,8 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.server.common.ThreadLocalDateFormat;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.VersionInfo;
@@ -28,12 +28,13 @@
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
-import java.util.TimeZone;
 import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -45,12 +46,13 @@
 public class ListPathsServlet extends DfsServlet {
   /** For java.io.Serializable */
   private static final long serialVersionUID = 1L;
-  public static final ThreadLocalDateFormat df = 
-    new ThreadLocalDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
 
-  static {
-    df.setTimeZone(TimeZone.getTimeZone("UTC"));
-  }
+  public static final ThreadLocal<SimpleDateFormat> df =
+    new ThreadLocal<SimpleDateFormat>() {
+      protected SimpleDateFormat initialValue() {
+        return HftpFileSystem.getDateFormat();
+      }
+    };
 
   /**
    * Write a node to output.
@@ -58,10 +60,11 @@
    * For files, it also includes size, replication and block-size. 
    */
   static void writeInfo(FileStatus i, XMLOutputter doc) throws IOException {
+    final SimpleDateFormat ldf = df.get();
     doc.startTag(i.isDir() ? "directory" : "file");
     doc.attribute("path", i.getPath().toUri().getPath());
-    doc.attribute("modified", df.format(new Date(i.getModificationTime())));
-    doc.attribute("accesstime", df.format(new Date(i.getAccessTime())));
+    doc.attribute("modified", ldf.format(new Date(i.getModificationTime())));
+    doc.attribute("accesstime", ldf.format(new Date(i.getAccessTime())));
     if (!i.isDir()) {
       doc.attribute("size", String.valueOf(i.getLen()));
       doc.attribute("replication", String.valueOf(i.getReplication()));
@@ -92,7 +95,7 @@
     root.put("recursive", recur ? "yes" : "no");
     root.put("filter", filter);
     root.put("exclude", exclude);
-    root.put("time", df.format(new Date()));
+    root.put("time", df.get().format(new Date()));
     root.put("version", VersionInfo.getVersion());
     return root;
   }
@@ -149,7 +152,12 @@
       while (!pathstack.empty()) {
         String p = pathstack.pop();
         try {
-          for (FileStatus i : nnproxy.getListing(p)) {
+          FileStatus[] listing = nnproxy.getListing(p);
+          if (listing == null) {
+            LOG.warn("ListPathsServlet - Path " + p + " does not exist");
+            continue;
+          }
+          for (FileStatus i : listing) {
             if (exclude.matcher(i.getPath().getName()).matches()
                 || !filter.matcher(i.getPath().getName()).matches()) {
               continue;

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Sat Nov 28 20:05:56 2009
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -32,8 +33,10 @@
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
@@ -46,6 +49,7 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -62,14 +66,16 @@
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -175,7 +181,7 @@
   /** Return the {@link FSNamesystem} object.
    * @return {@link FSNamesystem} object.
    */
-  public FSNamesystem getNamesystem() {
+  FSNamesystem getNamesystem() {
     return namesystem;
   }
 
@@ -244,11 +250,11 @@
 
   protected InetSocketAddress getHttpServerAddress(Configuration conf) {
     return  NetUtils.createSocketAddr(
-        conf.get("dfs.http.address", "0.0.0.0:50070"));
+        conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50070"));
   }
 
   protected void setHttpServerAddress(Configuration conf){
-    conf.set("dfs.http.address", getHostPortString(httpAddress));
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, getHostPortString(httpAddress));
   }
 
   protected void loadNamesystem(Configuration conf) throws IOException {
@@ -337,10 +343,11 @@
     this.httpServer = new HttpServer("hdfs", infoHost, infoPort, 
         infoPort == 0, conf);
     if (conf.getBoolean("dfs.https.enable", false)) {
-      boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
+      boolean needClientAuth = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
+                                               DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
-          "dfs.https.address", infoHost + ":" + 0));
-      Configuration sslConf = new Configuration(false);
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0));
+      Configuration sslConf = new HdfsConfiguration(false);
       sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
           "ssl-server.xml"));
       this.httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
@@ -638,10 +645,16 @@
   }
 
   /** {@inheritDoc} */
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return namesystem.getServerDefaults();
+  }
+
+  /** {@inheritDoc} */
   public void create(String src, 
                      FsPermission masked,
                              String clientName, 
                              EnumSetWritable<CreateFlag> flag,
+                             boolean createParent,
                              short replication,
                              long blockSize
                              ) throws IOException {
@@ -657,7 +670,7 @@
     namesystem.startFile(src,
         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
             null, masked),
-        clientName, clientMachine, flag.get(), replication, blockSize);
+        clientName, clientMachine, flag.get(), createParent, replication, blockSize);
     myMetrics.numFilesCreated.inc();
     myMetrics.numCreateFileOps.inc();
   }
@@ -693,13 +706,30 @@
     namesystem.setOwner(src, username, groupname);
   }
 
-  /**
-   */
-  public LocatedBlock addBlock(String src, 
-                               String clientName) throws IOException {
+
+  @Override
+  public LocatedBlock addBlock(String src, String clientName,
+                               Block previous) throws IOException {
+    return addBlock(src, clientName, previous, null);
+  }
+
+  @Override
+  public LocatedBlock addBlock(String src,
+                               String clientName,
+                               Block previous,
+                               DatanodeInfo[] excludedNodes
+                               ) throws IOException {
     stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
                          +src+" for "+clientName);
-    LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
+    HashMap<Node, Node> excludedNodesSet = null;
+    if (excludedNodes != null) {
+      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+      for (Node node:excludedNodes) {
+        excludedNodesSet.put(node, node);
+      }
+    }
+    LocatedBlock locatedBlock = 
+      namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
     if (locatedBlock != null)
       myMetrics.numAddBlockOps.inc();
     return locatedBlock;
@@ -718,9 +748,11 @@
   }
 
   /** {@inheritDoc} */
-  public boolean complete(String src, String clientName) throws IOException {
+  public boolean complete(String src, String clientName,
+                          Block last) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
-    CompleteFileStatus returnCode = namesystem.completeFile(src, clientName);
+    CompleteFileStatus returnCode =
+      namesystem.completeFile(src, clientName, last);
     if (returnCode == CompleteFileStatus.STILL_WAITING) {
       return false;
     } else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
@@ -749,10 +781,20 @@
   }
 
   /** {@inheritDoc} */
-  public long nextGenerationStamp(Block block) throws IOException{
-    return namesystem.nextGenerationStampForBlock(block);
+  @Override
+  public LocatedBlock updateBlockForPipeline(Block block, String clientName)
+  throws IOException {
+    return namesystem.updateBlockForPipeline(block, clientName);
   }
 
+
+  @Override
+  public void updatePipeline(String clientName, Block oldBlock,
+      Block newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+  }
+  
   /** {@inheritDoc} */
   public void commitBlockSynchronization(Block block,
       long newgenerationstamp, long newlength,
@@ -766,8 +808,9 @@
     return namesystem.getPreferredBlockSize(filename);
   }
     
-  /**
-   */
+  /** {@inheritDoc} */
+  @Deprecated
+  @Override
   public boolean rename(String src, String dst) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
     if (!checkPathLength(dst)) {
@@ -780,6 +823,25 @@
     }
     return ret;
   }
+  
+  /** 
+   * {@inheritDoc}
+   */
+  public void concat(String trg, String[] src) throws IOException {
+    namesystem.concat(trg, src);
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void rename(String src, String dst, Options.Rename... options) throws IOException {
+    stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
+    if (!checkPathLength(dst)) {
+      throw new IOException("rename: Pathname too long.  Limit " 
+                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    namesystem.renameTo(src, dst, options);
+    myMetrics.numFilesRenamed.inc();
+  }
 
   /**
    */
@@ -813,7 +875,7 @@
   }
     
   /** {@inheritDoc} */
-  public boolean mkdirs(String src, FsPermission masked) throws IOException {
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
     if (!checkPathLength(src)) {
       throw new IOException("mkdirs: Pathname too long.  Limit " 
@@ -821,7 +883,7 @@
     }
     return namesystem.mkdirs(src,
         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
-            null, masked));
+            null, masked), createParent);
   }
 
   /**
@@ -900,11 +962,11 @@
 
   /**
    * Refresh the list of datanodes that the namenode should allow to  
-   * connect.  Re-reads conf by creating new Configuration object and 
+   * connect.  Re-reads conf by creating new HdfsConfiguration object and 
    * uses the files list in the configuration to update the list. 
    */
   public void refreshNodes() throws IOException {
-    namesystem.refreshNodes(new Configuration());
+    namesystem.refreshNodes(new HdfsConfiguration());
   }
 
   /**
@@ -1235,7 +1297,7 @@
   public static NameNode createNameNode(String argv[], 
                                  Configuration conf) throws IOException {
     if (conf == null)
-      conf = new Configuration();
+      conf = new HdfsConfiguration();
     StartupOption startOpt = parseArguments(argv);
     if (startOpt == null) {
       printUsage();

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Sat Nov 28 20:05:56 2009
@@ -253,8 +253,8 @@
                     locs.length + " replica(s).");
       }
       // verify block placement policy
-      int missingRacks = ReplicationTargetChooser.verifyBlockPlacement(
-                    lBlk, targetFileReplication, networktopology);
+      int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
+                           verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication));
       if (missingRacks > 0) {
         res.numMisReplicatedBlocks++;
         misReplicatedPerFile++;
@@ -335,7 +335,7 @@
     String target = lostFound + file.getPath();
     String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
     try {
-      if (!namenode.mkdirs(target, file.getPermission())) {
+      if (!namenode.mkdirs(target, file.getPermission(), true)) {
         LOG.warn(errmsg);
         return;
       }
@@ -501,7 +501,7 @@
       
       final FileStatus lfStatus = dfs.getFileInfo(lfName);
       if (lfStatus == null) { // not exists
-        lfInitedOk = dfs.mkdirs(lfName);
+        lfInitedOk = dfs.mkdirs(lfName, null, true);
         lostFound = lfName;
       } else if (!lfStatus.isDir()) { // exists but not a directory
         LOG.warn("Cannot use /lost+found : a regular file with this name exists.");

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Sat Nov 28 20:05:56 2009
@@ -28,6 +28,8 @@
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.jsp.JspWriter;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
@@ -38,6 +40,8 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
+import org.znerd.xmlenc.*;
+
 class NamenodeJspHelper {
   static String getSafeModeText(FSNamesystem fsn) {
     if (!fsn.isInSafeMode())
@@ -161,6 +165,9 @@
       ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
       fsn.DFSNodesStatus(live, dead);
 
+      ArrayList<DatanodeDescriptor> decommissioning = fsn
+          .getDecommissioningNodes();
+
       sorterField = request.getParameter("sorter/field");
       sorterOrder = request.getParameter("sorter/order");
       if (sorterField == null)
@@ -213,7 +220,14 @@
           + "<a href=\"dfsnodelist.jsp?whatNodes=LIVE\">Live Nodes</a> "
           + colTxt() + ":" + colTxt() + live.size() + rowTxt() + colTxt()
           + "<a href=\"dfsnodelist.jsp?whatNodes=DEAD\">Dead Nodes</a> "
-          + colTxt() + ":" + colTxt() + dead.size() + "</table></div><br>\n");
+          + colTxt() + ":" + colTxt() + dead.size() + rowTxt() + colTxt()
+          + "<a href=\"dfsnodelist.jsp?whatNodes=DECOMMISSIONING\">"
+          + "Decommissioning Nodes</a> "
+          + colTxt() + ":" + colTxt() + decommissioning.size() 
+          + rowTxt() + colTxt()
+          + "Number of Under-Replicated Blocks" + colTxt() + ":" + colTxt()
+          + fsn.getUnderReplicatedBlocks()
+          + "</table></div><br>\n");
 
       if (live.isEmpty() && dead.isEmpty()) {
         out.print("There are no datanodes in the cluster");
@@ -278,6 +292,44 @@
       return ret;
     }
 
+    void generateDecommissioningNodeData(JspWriter out, DatanodeDescriptor d,
+        String suffix, boolean alive, int nnHttpPort) throws IOException {
+      String url = "http://" + d.getHostName() + ":" + d.getInfoPort()
+          + "/browseDirectory.jsp?namenodeInfoPort=" + nnHttpPort + "&dir="
+          + URLEncoder.encode("/", "UTF-8");
+
+      String name = d.getHostName() + ":" + d.getPort();
+      if (!name.matches("\\d+\\.\\d+.\\d+\\.\\d+.*"))
+        name = name.replaceAll("\\.[^.:]*", "");
+      int idx = (suffix != null && name.endsWith(suffix)) ? name
+          .indexOf(suffix) : -1;
+
+      out.print(rowTxt() + "<td class=\"name\"><a title=\"" + d.getHost() + ":"
+          + d.getPort() + "\" href=\"" + url + "\">"
+          + ((idx > 0) ? name.substring(0, idx) : name) + "</a>"
+          + ((alive) ? "" : "\n"));
+      if (!alive) {
+        return;
+      }
+
+      long decommRequestTime = d.decommissioningStatus.getStartTime();
+      long timestamp = d.getLastUpdate();
+      long currentTime = System.currentTimeMillis();
+      long hoursSinceDecommStarted = (currentTime - decommRequestTime)/3600000;
+      long remainderMinutes = ((currentTime - decommRequestTime)/60000) % 60;
+      out.print("<td class=\"lastcontact\"> "
+          + ((currentTime - timestamp) / 1000)
+          + "<td class=\"underreplicatedblocks\">"
+          + d.decommissioningStatus.getUnderReplicatedBlocks()
+          + "<td class=\"blockswithonlydecommissioningreplicas\">"
+          + d.decommissioningStatus.getDecommissionOnlyReplicas() 
+          + "<td class=\"underrepblocksinfilesunderconstruction\">"
+          + d.decommissioningStatus.getUnderReplicatedInOpenFiles()
+          + "<td class=\"timesincedecommissionrequest\">"
+          + hoursSinceDecommStarted + " hrs " + remainderMinutes + " mins"
+          + "\n");
+    }
+    
     void generateNodeData(JspWriter out, DatanodeDescriptor d,
         String suffix, boolean alive, int nnHttpPort) throws IOException {
       /*
@@ -428,7 +480,7 @@
             }
           }
           out.print("</table>\n");
-        } else {
+        } else if (whatNodes.equals("DEAD")) {
 
           out.print("<br> <a name=\"DeadNodes\" id=\"title\"> "
               + " Dead Datanodes : " + dead.size() + "</a><br><br>\n");
@@ -444,9 +496,229 @@
 
             out.print("</table>\n");
           }
+        } else if (whatNodes.equals("DECOMMISSIONING")) {
+          // Decommissioning Nodes
+          ArrayList<DatanodeDescriptor> decommissioning = nn.getNamesystem()
+              .getDecommissioningNodes();
+          out.print("<br> <a name=\"DecommissioningNodes\" id=\"title\"> "
+              + " Decommissioning Datanodes : " + decommissioning.size()
+              + "</a><br><br>\n");
+          if (decommissioning.size() > 0) {
+            out.print("<table border=1 cellspacing=0> <tr class=\"headRow\"> "
+                + "<th " + nodeHeaderStr("name") 
+                + "> Node <th " + nodeHeaderStr("lastcontact")
+                + "> Last <br>Contact <th "
+                + nodeHeaderStr("underreplicatedblocks")
+                + "> Under Replicated Blocks <th "
+                + nodeHeaderStr("blockswithonlydecommissioningreplicas")
+                + "> Blocks With No <br> Live Replicas <th "
+                + nodeHeaderStr("underrepblocksinfilesunderconstruction")
+                + "> Under Replicated Blocks <br> In Files Under Construction" 
+                + " <th " + nodeHeaderStr("timesincedecommissionrequest")
+                + "> Time Since Decommissioning Started"
+                );
+
+            JspHelper.sortNodeList(decommissioning, "name", "ASC");
+            for (int i = 0; i < decommissioning.size(); i++) {
+              generateDecommissioningNodeData(out, decommissioning.get(i),
+                  port_suffix, true, nnHttpPort);
+            }
+            out.print("</table>\n");
+          }
         }
         out.print("</div>");
       }
     }
   }
-}
\ No newline at end of file
+  
+  // utility class used in block_info_xml.jsp
+  static class XMLBlockInfo {
+    final Block block;
+    final INodeFile inode;
+    final FSNamesystem fsn;
+    
+    public XMLBlockInfo(FSNamesystem fsn, Long blockId) {
+      this.fsn = fsn;
+      if (blockId == null) {
+        this.block = null;
+        this.inode = null;
+      } else {
+        this.block = new Block(blockId);
+        this.inode = fsn.blockManager.getINode(block);
+      }
+    }
+
+    private String getLocalParentDir(INode inode) {
+      StringBuilder pathBuf = new StringBuilder();
+      INode node = inode;
+      
+      // loop up to directory root, prepending each directory name to buffer
+      while ((node = node.getParent()) != null && node.getLocalName() != "") {
+        pathBuf.insert(0, '/').insert(0, node.getLocalName());
+      }
+
+      return pathBuf.toString();
+    }
+
+    public void toXML(XMLOutputter doc) throws IOException {
+      doc.startTag("block_info");
+      if (block == null) {
+        doc.startTag("error");
+        doc.pcdata("blockId must be a Long");
+        doc.endTag();
+      }else{
+        doc.startTag("block_id");
+        doc.pcdata(""+block.getBlockId());
+        doc.endTag();
+
+        doc.startTag("block_name");
+        doc.pcdata(block.getBlockName());
+        doc.endTag();
+
+        if (inode != null) {
+          doc.startTag("file");
+
+          doc.startTag("local_name");
+          doc.pcdata(inode.getLocalName());
+          doc.endTag();
+
+          doc.startTag("local_directory");
+          doc.pcdata(getLocalParentDir(inode));
+          doc.endTag();
+
+          doc.startTag("user_name");
+          doc.pcdata(inode.getUserName());
+          doc.endTag();
+
+          doc.startTag("group_name");
+          doc.pcdata(inode.getGroupName());
+          doc.endTag();
+
+          doc.startTag("is_directory");
+          doc.pcdata(""+inode.isDirectory());
+          doc.endTag();
+
+          doc.startTag("access_time");
+          doc.pcdata(""+inode.getAccessTime());
+          doc.endTag();
+
+          doc.startTag("is_under_construction");
+          doc.pcdata(""+inode.isUnderConstruction());
+          doc.endTag();
+
+          doc.startTag("ds_quota");
+          doc.pcdata(""+inode.getDsQuota());
+          doc.endTag();
+
+          doc.startTag("permission_status");
+          doc.pcdata(inode.getPermissionStatus().toString());
+          doc.endTag();
+
+          doc.startTag("replication");
+          doc.pcdata(""+inode.getReplication());
+          doc.endTag();
+
+          doc.startTag("disk_space_consumed");
+          doc.pcdata(""+inode.diskspaceConsumed());
+          doc.endTag();
+
+          doc.startTag("preferred_block_size");
+          doc.pcdata(""+inode.getPreferredBlockSize());
+          doc.endTag();
+
+          doc.endTag(); // </file>
+        } 
+
+        doc.startTag("replicas");
+       
+        if (fsn.blockManager.blocksMap.contains(block)) {
+          Iterator<DatanodeDescriptor> it =
+            fsn.blockManager.blocksMap.nodeIterator(block);
+
+          while (it.hasNext()) {
+            doc.startTag("replica");
+
+            DatanodeDescriptor dd = it.next();
+
+            doc.startTag("host_name");
+            doc.pcdata(dd.getHostName());
+            doc.endTag();
+
+            boolean isCorrupt = fsn.getCorruptReplicaBlockIds(0,
+                                  block.getBlockId()) != null;
+            
+            doc.startTag("is_corrupt");
+            doc.pcdata(""+isCorrupt);
+            doc.endTag();
+            
+            doc.endTag(); // </replica>
+          }
+
+        } 
+        doc.endTag(); // </replicas>
+                
+      }
+      
+      doc.endTag(); // </block_info>
+      
+    }
+  }
+  
+  // utility class used in corrupt_replicas_xml.jsp
+  static class XMLCorruptBlockInfo {
+    final FSNamesystem fsn;
+    final Configuration conf;
+    final Long startingBlockId;
+    final int numCorruptBlocks;
+    
+    public XMLCorruptBlockInfo(FSNamesystem fsn, Configuration conf,
+                               int numCorruptBlocks, Long startingBlockId) {
+      this.fsn = fsn;
+      this.conf = conf;
+      this.numCorruptBlocks = numCorruptBlocks;
+      this.startingBlockId = startingBlockId;
+    }
+
+
+    public void toXML(XMLOutputter doc) throws IOException {
+      
+      doc.startTag("corrupt_block_info");
+      
+      if (numCorruptBlocks < 0 || numCorruptBlocks > 100) {
+        doc.startTag("error");
+        doc.pcdata("numCorruptBlocks must be >= 0 and <= 100");
+        doc.endTag();
+      }
+      
+      doc.startTag("dfs_replication");
+      doc.pcdata(""+conf.getInt("dfs.replication", 3));
+      doc.endTag();
+      
+      doc.startTag("num_missing_blocks");
+      doc.pcdata(""+fsn.getMissingBlocksCount());
+      doc.endTag();
+      
+      doc.startTag("num_corrupt_replica_blocks");
+      doc.pcdata(""+fsn.getCorruptReplicaBlocks());
+      doc.endTag();
+     
+      doc.startTag("corrupt_replica_block_ids");
+      long[] corruptBlockIds
+        = fsn.getCorruptReplicaBlockIds(numCorruptBlocks,
+                                        startingBlockId);
+      if (corruptBlockIds != null) {
+        for (Long blockId: corruptBlockIds) {
+          doc.startTag("block_id");
+          doc.pcdata(""+blockId);
+          doc.endTag();
+        }
+      }
+      
+      doc.endTag(); // </corrupt_replica_block_ids>
+
+      doc.endTag(); // </corrupt_block_info>
+      
+      doc.getWriter().flush();
+    }
+  }    
+}

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Sat Nov 28 20:05:56 2009
@@ -38,6 +38,8 @@
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -117,7 +119,7 @@
    */
   private void initialize(Configuration conf) throws IOException {
     // initiate Java VM metrics
-    JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
+    JvmMetrics.init("SecondaryNameNode", conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY));
     
     // Create connection to the namenode.
     shouldRun = true;
@@ -138,12 +140,15 @@
     checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
 
     // Initialize other scheduling parameters from the configuration
-    checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
-    checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
+    checkpointPeriod = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 
+                                    DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
+    checkpointSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_KEY, 
+                                  DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_DEFAULT);
 
     // initialize the webserver for uploading files.
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
-        conf.get("dfs.secondary.http.address", "0.0.0.0:50090"));
+        conf.get(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
+                 DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT));
     infoBindAddress = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
     infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort,
@@ -156,7 +161,7 @@
 
     // The web-server port can be ephemeral... ensure we have the correct info
     infoPort = infoServer.getPort();
-    conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort); 
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" +infoPort); 
     LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
     LOG.warn("Checkpoint Period   :" + checkpointPeriod + " secs " +
              "(" + checkpointPeriod/60 + " min)");
@@ -280,7 +285,8 @@
     if (!FSConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) {
       throw new IOException("This is not a DFS");
     }
-    String configuredAddress = conf.get("dfs.http.address", "0.0.0.0:50070");
+    String configuredAddress = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
+                                        DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT);
     InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
     if (sockAddr.getAddress().isAnyLocalAddress()) {
       return fsName.getHost() + ":" + sockAddr.getPort();
@@ -455,7 +461,7 @@
    */
   public static void main(String[] argv) throws Exception {
     StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
-    Configuration tconf = new Configuration();
+    Configuration tconf = new HdfsConfiguration();
     if (argv.length >= 1) {
       SecondaryNameNode secondary = new SecondaryNameNode(tconf);
       int ret = secondary.processArgs(argv);

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Sat Nov 28 20:05:56 2009
@@ -21,17 +21,19 @@
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
-
+import java.util.Enumeration;
+import java.util.List;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.mortbay.jetty.InclusiveByteRange;
 
 public class StreamFile extends DfsServlet {
   /** for java.io.Serializable */
@@ -39,7 +41,7 @@
 
   static InetSocketAddress nameNodeAddr;
   static DataNode datanode = null;
-  private static final Configuration masterConf = new Configuration();
+  private static final Configuration masterConf = new HdfsConfiguration();
   static {
     if ((datanode = DataNode.getDataNode()) != null) {
       nameNodeAddr = datanode.getNameNodeAddr();
@@ -49,7 +51,7 @@
   /** getting a client for connecting to dfs */
   protected DFSClient getDFSClient(HttpServletRequest request)
       throws IOException {
-    Configuration conf = new Configuration(masterConf);
+    Configuration conf = new HdfsConfiguration(masterConf);
     UnixUserGroupInformation.saveToConf(conf,
         UnixUserGroupInformation.UGI_PROPERTY_NAME, getUGI(request));
     return new DFSClient(nameNodeAddr, conf);
@@ -65,22 +67,104 @@
       out.print("Invalid input");
       return;
     }
-    DFSClient dfs = getDFSClient(request);
+    
+    Enumeration reqRanges = request.getHeaders("Range");
+    if (reqRanges != null && !reqRanges.hasMoreElements())
+      reqRanges = null;
+
+    DFSClient dfs = getDFSClient(request);  
+    long fileLen = dfs.getFileInfo(filename).getLen();
     FSInputStream in = dfs.open(filename);
     OutputStream os = response.getOutputStream();
-    response.setHeader("Content-Disposition", "attachment; filename=\"" + 
-                       filename + "\"");
-    response.setContentType("application/octet-stream");
-    byte buf[] = new byte[4096];
+
     try {
-      int bytesRead;
-      while ((bytesRead = in.read(buf)) != -1) {
-        os.write(buf, 0, bytesRead);
+      if (reqRanges != null) {
+        List ranges = InclusiveByteRange.satisfiableRanges(reqRanges,
+                                                           fileLen);
+        StreamFile.sendPartialData(in, os, response, fileLen, ranges);
+      } else {
+        // No ranges, so send entire file
+        response.setHeader("Content-Disposition", "attachment; filename=\"" + 
+                           filename + "\"");
+        response.setContentType("application/octet-stream");
+        StreamFile.writeTo(in, os, 0L, fileLen);
       }
     } finally {
       in.close();
       os.close();
       dfs.close();
+    }      
+  }
+  
+  static void sendPartialData(FSInputStream in,
+                              OutputStream os,
+                              HttpServletResponse response,
+                              long contentLength,
+                              List ranges)
+  throws IOException {
+
+    if (ranges == null || ranges.size() != 1) {
+      //  if there are no satisfiable ranges, or if multiple ranges are
+      // requested (we don't support multiple range requests), send 416 response
+      response.setContentLength(0);
+      int status = HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE;
+      response.setStatus(status);
+      response.setHeader("Content-Range", 
+                InclusiveByteRange.to416HeaderRangeString(contentLength));
+    } else {
+      //  if there is only a single valid range (must be satisfiable 
+      //  since were here now), send that range with a 206 response
+      InclusiveByteRange singleSatisfiableRange =
+        (InclusiveByteRange)ranges.get(0);
+      long singleLength = singleSatisfiableRange.getSize(contentLength);
+      response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
+      response.setHeader("Content-Range", 
+        singleSatisfiableRange.toHeaderRangeString(contentLength));
+      System.out.println("first: "+singleSatisfiableRange.getFirst(contentLength));
+      System.out.println("singleLength: "+singleLength);
+      
+      StreamFile.writeTo(in,
+                         os,
+                         singleSatisfiableRange.getFirst(contentLength),
+                         singleLength);
     }
   }
+  
+  static void writeTo(FSInputStream in,
+                      OutputStream os,
+                      long start,
+                      long count) 
+  throws IOException {
+    byte buf[] = new byte[4096];
+    long bytesRemaining = count;
+    int bytesRead;
+    int bytesToRead;
+
+    in.seek(start);
+
+    while (true) {
+      // number of bytes to read this iteration
+      bytesToRead = (int)(bytesRemaining<buf.length ? 
+                                                      bytesRemaining:
+                                                      buf.length);
+      
+      // number of bytes actually read this iteration
+      bytesRead = in.read(buf, 0, bytesToRead);
+
+      // if we can't read anymore, break
+      if (bytesRead == -1) {
+        break;
+      } 
+      
+      os.write(buf, 0, bytesRead);
+
+      bytesRemaining -= bytesRead;
+
+      // if we don't need to read anymore, break
+      if (bytesRemaining <= 0) {
+        break;
+      }
+
+    } 
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Sat Nov 28 20:05:56 2009
@@ -26,7 +26,7 @@
  * Blocks have only one replicas has the highest
  */
 class UnderReplicatedBlocks implements Iterable<Block> {
-  static final int LEVEL = 3;
+  static final int LEVEL = 4;
   private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
       
   /* constructor */
@@ -53,7 +53,7 @@
     }
     return size;
   }
-        
+
   /* Check if a block is in the neededReplication queue */
   synchronized boolean contains(Block block) {
     for(TreeSet<Block> set:priorityQueues) {
@@ -71,8 +71,10 @@
                           int curReplicas, 
                           int decommissionedReplicas,
                           int expectedReplicas) {
-    if (curReplicas<0 || curReplicas>=expectedReplicas) {
-      return LEVEL; // no need to replicate
+    if (curReplicas<0) {
+      return LEVEL;
+    } else if (curReplicas>=expectedReplicas) {
+      return 3; // Block doesn't have enough racks
     } else if(curReplicas==0) {
       // If there are zero non-decommissioned replica but there are
       // some decommissioned replicas, then assign them highest priority
@@ -99,7 +101,7 @@
                            int curReplicas, 
                            int decomissionedReplicas,
                            int expectedReplicas) {
-    if(curReplicas<0 || expectedReplicas <= curReplicas) {
+    if(curReplicas<0) {
       return false;
     }
     int priLevel = getPriority(block, curReplicas, decomissionedReplicas,

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMetrics.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMetrics.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMetrics.java Sat Nov 28 20:05:56 2009
@@ -22,6 +22,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.metrics.*;
 import org.apache.hadoop.metrics.util.MetricsBase;
 import org.apache.hadoop.metrics.util.MetricsIntValue;
@@ -67,7 +68,7 @@
 
   public FSNamesystemMetrics(FSNamesystem fsNameSystem, Configuration conf) {
     this.fsNameSystem = fsNameSystem;
-    String sessionId = conf.get("session.id");
+    String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
      
     // Create a record for FSNamesystem metrics
     MetricsContext metricsContext = MetricsUtil.getContext("dfs");

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Sat Nov 28 20:05:56 2009
@@ -21,6 +21,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.metrics.*;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.hadoop.metrics.util.MetricsBase;
@@ -86,7 +87,7 @@
 
       
     public NameNodeMetrics(Configuration conf, NamenodeRole nameNodeRole) {
-      String sessionId = conf.get("session.id");
+      String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
       // Initiate Java VM metrics
       String processName = nameNodeRole.toString();
       JvmMetrics.init(processName, sessionId);

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Sat Nov 28 20:05:56 2009
@@ -35,10 +35,9 @@
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 20: SendHeartbeat may return KeyUpdateCommand
-   *     Register returns access keys inside DatanodeRegistration object
+   * 23: nextGenerationStamp() removed.
    */
-  public static final long versionID = 20L;
+  public static final long versionID = 23L;
   
   // error code
   final static int NOTIFY = 0;
@@ -143,12 +142,6 @@
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
   
   /**
-   * @return the next GenerationStamp to be associated with the specified
-   * block. 
-   */
-  public long nextGenerationStamp(Block block) throws IOException;
-
-  /**
    * Commit block synchronization in lease recovery
    */
   public void commitBlockSynchronization(Block block,

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Sat Nov 28 20:05:56 2009
@@ -23,13 +23,13 @@
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.ExportedAccessKeys;
 
 /** 
  * DatanodeRegistration class contains all information the name-node needs

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java Sat Nov 28 20:05:56 2009
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.ipc.VersionedProtocol;
 
 /** An inter-datanode protocol for updating generation stamp
@@ -31,17 +32,23 @@
   public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
 
   /**
-   * 3: added a finalize parameter to updateBlock
+   * 5: getBlockMetaDataInfo(), updateBlock() removed.
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 5L;
 
-  /** @return the BlockMetaDataInfo of a block;
-   *  null if the block is not found 
+  /**
+   * Initialize a replica recovery.
+   * 
+   * @return actual state of the replica on this data-node or 
+   * null if data-node does not have the replica.
    */
-  BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
+  ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+  throws IOException;
 
   /**
-   * Update the block to the new generation stamp and length.  
+   * Update replica with the new generation stamp and length.  
    */
-  void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;
+  Block updateReplicaUnderRecovery(Block oldBlock,
+                                   long recoveryId,
+                                   long newLength) throws IOException;
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java Sat Nov 28 20:05:56 2009
@@ -21,10 +21,10 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.ExportedAccessKeys;
 
 public class KeyUpdateCommand extends DatanodeCommand {
   private ExportedAccessKeys keys;

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java Sat Nov 28 20:05:56 2009
@@ -21,9 +21,9 @@
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.security.ExportedAccessKeys;
 
 /*****************************************************************************
  * Protocol that a secondary NameNode uses to communicate with the NameNode.

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Sat Nov 28 20:05:56 2009
@@ -54,6 +54,11 @@
  */
 public class DFSAdmin extends FsShell {
 
+  static{
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+  }
+
   /**
    * An abstract class for the execution of a file system command
    */

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSck.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSck.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSck.java Sat Nov 28 20:05:56 2009
@@ -30,6 +30,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
@@ -106,7 +108,8 @@
     }
 
     final StringBuffer url = new StringBuffer("http://");
-    url.append(getConf().get("dfs.http.address", "0.0.0.0:50070"));
+    url.append(getConf().get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, 
+                             DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
     url.append("/fsck?ugi=").append(ugi).append("&path=");
 
     String dir = "/";
@@ -162,7 +165,7 @@
     if ((args.length == 0 ) || ("-files".equals(args[0]))) 
       printUsage();
     else
-      res = ToolRunner.run(new DFSck(new Configuration()), args);
+      res = ToolRunner.run(new DFSck(new HdfsConfiguration()), args);
     System.exit(res);
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Sat Nov 28 20:05:56 2009
@@ -96,7 +96,7 @@
 class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  private static int [] versions = {-16, -17, -18, -19};
+  private static int [] versions = {-16, -17, -18, -19, -20, -21, -22};
   private int imageVersion = 0;
 
   /* (non-Javadoc)

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java Sat Nov 28 20:05:56 2009
@@ -45,7 +45,7 @@
     "\n" +
     "The oiv utility will attempt to parse correctly formed image files\n" +
     "and will abort fail with mal-formed image files. Currently the\n" +
-    "supports FSImage layout versions -16 through -19.\n" +
+    "supports FSImage layout versions -16 through -22.\n" +
     "\n" +
     "The tool works offline and does not require a running cluster in\n" +
     "order to process an image file.\n" +

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java Sat Nov 28 20:05:56 2009
@@ -17,16 +17,20 @@
  */
 package org.apache.hadoop.fi;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.fi.FiTestUtil.Action;
 import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
+import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
+import org.apache.hadoop.fi.FiTestUtil.CountdownConstraint;
+import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Utilities for DataTransferProtocol related tests,
  * e.g. TestFiDataTransferProtocol.
@@ -63,6 +67,16 @@
     /** Simulate action for the statusRead pointcut */
     public final ActionContainer<DatanodeID> fiStatusRead
         = new ActionContainer<DatanodeID>();
+    /** Simulate action for the pipelineAck pointcut */
+    public final ActionContainer<DatanodeID> fiPipelineAck
+        = new ActionContainer<DatanodeID>();
+    /** Simulate action for the pipelineClose pointcut */
+    public final ActionContainer<DatanodeID> fiPipelineClose
+        = new ActionContainer<DatanodeID>();
+    /** Simulate action for the blockFileClose pointcut */
+    public final ActionContainer<DatanodeID> fiBlockFileClose
+        = new ActionContainer<DatanodeID>();
+
     /** Verification action for the pipelineInitNonAppend pointcut */
     public final ActionContainer<Integer> fiPipelineInitErrorNonAppend
         = new ActionContainer<Integer>();
@@ -124,17 +138,44 @@
 
     /** {@inheritDoc} */
     public String toString() {
-      return currentTest + ", index=" + index;
+      return getClass().getSimpleName() + ":" + currentTest
+          + ", index=" + index;
     }
 
-    /** {@inheritDoc}
-     * @param datanodeID*/
+    /** return a String with this object and the datanodeID. */
     String toString(DatanodeID datanodeID) {
       return "FI: " + this + ", datanode="
           + datanodeID.getName();
     }
   }
 
+  /** An action to set a marker if the DatanodeID is matched. */
+  public static class DatanodeMarkingAction extends DataNodeAction {
+    private final MarkerConstraint marker;
+
+    /** Construct an object. */
+    public DatanodeMarkingAction(String currentTest, int index,
+        MarkerConstraint marker) {
+      super(currentTest, index);
+      this.marker = marker;
+    }
+
+    /** Set the marker if the DatanodeID is matched. */
+    @Override
+    public void run(DatanodeID datanodeid) throws IOException {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(datanodeid);
+      if (p.contains(index, datanodeid)) {
+        marker.mark();
+      }
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return super.toString() + ", " + marker;
+    }
+  }
+
   /** Throws OutOfMemoryError. */
   public static class OomAction extends DataNodeAction {
     /** Create an action for datanode i in the pipeline. */
@@ -173,21 +214,75 @@
     }
   }
 
+  /** Throws an IOException. */
+  public static class IoeAction extends DataNodeAction {
+    private final String error; 
+
+    /** Create an action for datanode i in the pipeline. */
+    public IoeAction(String currentTest, int i, String error) {
+      super(currentTest, i);
+      this.error = error;
+    }
+
+    @Override
+    public void run(DatanodeID id) throws IOException {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (p.contains(index, id)) {
+        final String s = toString(id);
+        FiTestUtil.LOG.info(s);
+        throw new IOException(s);
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+      return error + " " + super.toString();
+    }
+  }
+
   /**
    * Sleep some period of time so that it slows down the datanode
    * or sleep forever so that datanode becomes not responding.
    */
   public static class SleepAction extends DataNodeAction {
-    /** In milliseconds, duration <= 0 means sleeping forever.*/
-    final long duration;
+    /** In milliseconds;
+     * must have (0 <= minDuration < maxDuration) or (maxDuration <= 0).
+     */
+    final long minDuration;
+    /** In milliseconds; maxDuration <= 0 means sleeping forever.*/
+    final long maxDuration;
 
     /**
      * Create an action for datanode i in the pipeline.
      * @param duration In milliseconds, duration <= 0 means sleeping forever.
      */
     public SleepAction(String currentTest, int i, long duration) {
+      this(currentTest, i, duration, duration <= 0? duration: duration+1);
+    }
+
+    /**
+     * Create an action for datanode i in the pipeline.
+     * @param minDuration minimum sleep time
+     * @param maxDuration maximum sleep time
+     */
+    public SleepAction(String currentTest, int i,
+        long minDuration, long maxDuration) {
       super(currentTest, i);
-      this.duration = duration;
+
+      if (maxDuration > 0) {
+        if (minDuration < 0) {
+          throw new IllegalArgumentException("minDuration = " + minDuration
+              + " < 0 but maxDuration = " + maxDuration + " > 0");
+        }
+        if (minDuration >= maxDuration) {
+          throw new IllegalArgumentException(
+              minDuration + " = minDuration >= maxDuration = " + maxDuration);
+        }
+      }
+      this.minDuration = minDuration;
+      this.maxDuration = maxDuration;
     }
 
     @Override
@@ -195,15 +290,21 @@
       final DataTransferTest test = getDataTransferTest();
       final Pipeline p = test.getPipeline(id);
       if (!test.isSuccess() && p.contains(index, id)) {
-        final String s = toString(id) + ", duration=" + duration;
-        FiTestUtil.LOG.info(s);
-        if (duration <= 0) {
+        FiTestUtil.LOG.info(toString(id));
+        if (maxDuration <= 0) {
           for(; true; FiTestUtil.sleep(1000)); //sleep forever
         } else {
-          FiTestUtil.sleep(duration);
+          FiTestUtil.sleep(minDuration, maxDuration);
         }
       }
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+      return super.toString() + ", duration="
+          + (maxDuration <= 0? "infinity": "[" + minDuration + ", " + maxDuration + ")");
+    }
   }
 
   /** Action for pipeline error verification */
@@ -237,4 +338,48 @@
       }
     }
   }
+
+  /**
+   *  Create a OomAction with a CountdownConstraint
+   *  so that it throws OutOfMemoryError if the count is zero.
+   */
+  public static ConstraintSatisfactionAction<DatanodeID> createCountdownOomAction(
+      String currentTest, int i, int count) {
+    return new ConstraintSatisfactionAction<DatanodeID>(
+        new OomAction(currentTest, i), new CountdownConstraint(count));
+  }
+
+  /**
+   *  Create a DoosAction with a CountdownConstraint
+   *  so that it throws DiskOutOfSpaceException if the count is zero.
+   */
+  public static ConstraintSatisfactionAction<DatanodeID> createCountdownDoosAction(
+      String currentTest, int i, int count) {
+    return new ConstraintSatisfactionAction<DatanodeID>(
+        new DoosAction(currentTest, i), new CountdownConstraint(count));
+  }
+
+  /**
+   * Create a SleepAction with a CountdownConstraint
+   * for datanode i in the pipeline.
+   * When the count is zero,
+   * sleep some period of time so that it slows down the datanode
+   * or sleep forever so the that datanode becomes not responding.
+   */
+  public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+      String currentTest, int i, long minDuration, long maxDuration, int count) {
+    return new ConstraintSatisfactionAction<DatanodeID>(
+        new SleepAction(currentTest, i, minDuration, maxDuration),
+        new CountdownConstraint(count));
+  }
+
+  /**
+   * Same as
+   * createCountdownSleepAction(currentTest, i, duration, duration+1, count).
+   */
+  public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+      String currentTest, int i, long duration, int count) {
+    return createCountdownSleepAction(currentTest, i, duration, duration+1,
+        count);
+  }
 }
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiConfig.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiConfig.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiConfig.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiConfig.java Sat Nov 28 20:05:56 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.fi;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 
 /**
  * This class wraps the logic around fault injection configuration file
@@ -37,7 +38,7 @@
   
   protected static void init () {
     if (conf == null) {
-      conf = new Configuration(false);
+      conf = new HdfsConfiguration(false);
       String configName = System.getProperty(CONFIG_PARAMETER, DEFAULT_CONFIG);
       conf.addResource(configName);
     }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java Sat Nov 28 20:05:56 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.fi;
 
 import java.io.IOException;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,9 +28,46 @@
   /** Logging */
   public static final Log LOG = LogFactory.getLog(FiTestUtil.class);
 
+  /** Random source */
+  public static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
+    protected Random initialValue() {
+      final Random r = new Random();
+      final long seed = r.nextLong();
+      LOG.info(Thread.currentThread() + ": seed=" + seed);
+      r.setSeed(seed);
+      return r;
+    }
+  };
+
+  /**
+   * Return a random integer uniformly distributed over the interval [min,max).
+   */
+  public static int nextRandomInt(final int min, final int max) {
+    final int d = max - min;
+    if (d <= 0) {
+      throw new IllegalArgumentException("d <= 0, min=" + min + ", max=" + max);
+    }
+    return d == 1? min: min + RANDOM.get().nextInt(d);
+  }
+
+  /**
+   * Return a random integer, with type long,
+   * uniformly distributed over the interval [min,max).
+   * Assume max - min <= Integer.MAX_VALUE.
+   */
+  public static long nextRandomLong(final long min, final long max) {
+    final long d = max - min;
+    if (d <= 0 || d > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "d <= 0 || d > Integer.MAX_VALUE, min=" + min + ", max=" + max);
+    }
+    return d == 1? min: min + RANDOM.get().nextInt((int)d);
+  }
+
   /** Return the method name of the callee. */
   public static String getMethodName() {
-    return Thread.currentThread().getStackTrace()[2].getMethodName();
+    final StackTraceElement[] s = Thread.currentThread().getStackTrace();
+    return s[s.length > 2? 2: s.length - 1].getMethodName();
   }
 
   /**
@@ -44,6 +82,18 @@
     }
   }
 
+  /**
+   * Sleep a random number of milliseconds over the interval [min, max).
+   * If there is an InterruptedException, re-throw it as a RuntimeException.
+   */
+  public static void sleep(final long min, final long max) {
+    final long n = nextRandomLong(min, max);
+    LOG.info(Thread.currentThread().getName() + " sleeps for " + n  +"ms");
+    if (n > 0) {
+      sleep(n);
+    }
+  }
+
   /** Action interface */
   public static interface Action<T> {
     /** Run the action with the parameter. */
@@ -67,4 +117,89 @@
       }
     }
   }
+
+  /** Constraint interface */
+  public static interface Constraint {
+    /** Is this constraint satisfied? */
+    public boolean isSatisfied();
+  }
+
+  /** Counting down, the constraint is satisfied if the count is zero. */
+  public static class CountdownConstraint implements Constraint {
+    private int count;
+
+    /** Initialize the count. */
+    public CountdownConstraint(int count) {
+      if (count < 0) {
+        throw new IllegalArgumentException(count + " = count < 0");
+      }
+      this.count = count;
+    }
+
+    /** Counting down, the constraint is satisfied if the count is zero. */
+    public boolean isSatisfied() {
+      if (count > 0) {
+        count--;
+        return false;
+      }
+      return true;
+    }
+  }
+  
+  /** An action is fired if all the constraints are satisfied. */
+  public static class ConstraintSatisfactionAction<T> implements Action<T> {
+    private final Action<T> action;
+    private final Constraint[] constraints;
+    
+    /** Constructor */
+    public ConstraintSatisfactionAction(
+        Action<T> action, Constraint... constraints) {
+      this.action = action;
+      this.constraints = constraints;
+    }
+
+    /**
+     * Fire the action if all the constraints are satisfied.
+     * Short-circuit-and is used. 
+     */
+    @Override
+    public final void run(T parameter) throws IOException {
+      for(Constraint c : constraints) {
+        if (!c.isSatisfied()) {
+          return;
+        }
+      }
+
+      //all constraints are satisfied, fire the action
+      action.run(parameter);
+    }
+  }
+
+  /** A MarkerConstraint is satisfied if it is marked. */
+  public static class MarkerConstraint implements Constraint {
+    private final String name;
+    private boolean marked = false;
+
+    /** Construct an object. */
+    public MarkerConstraint(String name) {
+      this.name = name;
+    }
+
+    /** Set marker to be marked. */
+    public void mark() {
+      marked = true;
+      LOG.info("Marking this " + this);
+    }
+
+    /** Is the marker marked? */
+    @Override
+    public boolean isSatisfied() {
+      return marked;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return getClass().getSimpleName() + "[" + name + ": " + marked + "]";
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/PipelineTest.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/PipelineTest.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/PipelineTest.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/PipelineTest.java Sat Nov 28 20:05:56 2009
@@ -19,7 +19,6 @@
 
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 
 /** A pipeline contains a list of datanodes. */
 public interface PipelineTest {

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Sat Nov 28 20:05:56 2009
@@ -22,12 +22,15 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.PipelineTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream.DataStreamer;
-
+import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
 import org.junit.Assert;
 
 /** Aspects for DFSClient */
-public aspect DFSClientAspects {
+privileged public aspect DFSClientAspects {
   public static final Log LOG = LogFactory.getLog(DFSClientAspects.class);
 
   pointcut callCreateBlockOutputStream(DataStreamer datastreamer):
@@ -35,7 +38,7 @@
 
   before(DataStreamer datastreamer) : callCreateBlockOutputStream(datastreamer) {
     Assert.assertFalse(datastreamer.hasError);
-    Assert.assertEquals(0, datastreamer.errorIndex);
+    Assert.assertEquals(-1, datastreamer.errorIndex);
   }
 
   pointcut pipelineInitNonAppend(DataStreamer datastreamer):
@@ -48,8 +51,9 @@
         + datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex);
     try {
       if (datastreamer.hasError) {
-        DataTransferTestUtil.getDataTransferTest().fiPipelineInitErrorNonAppend
-            .run(datastreamer.errorIndex);
+        DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+        if (dtTest != null )
+          dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
       }
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -66,22 +70,44 @@
         + " errorIndex=" + datastreamer.errorIndex);
   }
 
-  pointcut pipelineErrorAfterInit(boolean onError, boolean isAppend,
-      DataStreamer datastreamer):
-    call(* processDatanodeError(boolean, boolean))
-    && args(onError, isAppend)
-    && target(datastreamer)
-    && if(onError && !isAppend);
+  pointcut pipelineErrorAfterInit(DataStreamer datastreamer):
+    call(* processDatanodeError())
+    && within (DFSClient.DFSOutputStream.DataStreamer)
+    && target(datastreamer);
 
-  before(DataStreamer datastreamer) : pipelineErrorAfterInit(boolean, boolean, datastreamer) {
+  before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
     LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
         + datastreamer.errorIndex);
     try {
-      DataTransferTestUtil.getDataTransferTest().fiPipelineErrorAfterInit
-          .run(datastreamer.errorIndex);
+      DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+      if (dtTest != null )
+        dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
+  pointcut pipelineClose(DFSOutputStream out):
+    call(void flushInternal())
+    && withincode (void DFSOutputStream.close())
+    && this(out);
+
+  before(DFSOutputStream out) : pipelineClose(out) {
+    LOG.info("FI: before pipelineClose:");
+  }
+
+  pointcut checkAckQueue(DFSClient.DFSOutputStream.Packet cp):
+    call (void DFSClient.DFSOutputStream.waitAndQueuePacket(
+            DFSClient.DFSOutputStream.Packet))
+    && withincode (void DFSClient.DFSOutputStream.writeChunk(..))
+    && args(cp);
+
+  after(DFSClient.DFSOutputStream.Packet cp) : checkAckQueue (cp) {
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+    if (pTest != null && pTest instanceof PipelinesTest) {
+      LOG.debug("FI: Recording packet # " + cp.seqno
+          + " where queuing has occurred");
+      ((PipelinesTest) pTest).setVerified(cp.seqno);
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj Sat Nov 28 20:05:56 2009
@@ -27,7 +27,7 @@
   public static final Log LOG = LogFactory.getLog(ClientProtocolAspects.class);
 
   pointcut addBlock():
-    call(LocatedBlock ClientProtocol.addBlock(String, String));
+    call(LocatedBlock ClientProtocol.addBlock(String, String,..));
 
   after() returning(LocatedBlock lb): addBlock() {
     PipelineTest pipelineTest = DataTransferTestUtil.getPipelineTest();

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Sat Nov 28 20:05:56 2009
@@ -17,21 +17,28 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.DataInput;
 import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.PipelineTest;
 import org.apache.hadoop.fi.ProbabilityModel;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.hdfs.server.datanode.BlockReceiver.PacketResponder;
+import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
+import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 /**
  * This aspect takes care about faults injected into datanode.BlockReceiver 
  * class 
  */
-public aspect BlockReceiverAspects {
+privileged public aspect BlockReceiverAspects {
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
   pointcut callReceivePacket(BlockReceiver blockreceiver) :
@@ -44,11 +51,11 @@
 	
   before(BlockReceiver blockreceiver
       ) throws IOException : callReceivePacket(blockreceiver) {
-    LOG.info("FI: callReceivePacket");
+    final DatanodeRegistration dr = blockreceiver.getDataNode().getDatanodeRegistration();
+    LOG.info("FI: callReceivePacket, datanode=" + dr.getName());
     DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
     if (dtTest != null)
-      dtTest.fiCallReceivePacket.run(
-          blockreceiver.getDataNode().getDatanodeRegistration());
+      dtTest.fiCallReceivePacket.run(dr);
 
     if (ProbabilityModel.injectCriteria(BlockReceiver.class.getSimpleName())) {
       LOG.info("Before the injection point");
@@ -57,4 +64,161 @@
         thisJoinPoint.getStaticPart( ).getSourceLocation());
     }
   }
+  
+  // Pointcuts and advises for TestFiPipelines  
+  pointcut callSetNumBytes(BlockReceiver br, long offset) : 
+    call (void ReplicaInPipelineInterface.setNumBytes(long)) 
+    && withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int))
+    && args(offset) 
+    && this(br);
+  
+  after(BlockReceiver br, long offset) : callSetNumBytes(br, offset) {
+    LOG.debug("FI: Received bytes To: " + br.datanode.dnRegistration.getStorageID() + ": " + offset);
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+    if (pTest == null) {
+      LOG.debug("FI: no pipeline has been found in receiving");
+      return;
+    }
+    if (!(pTest instanceof PipelinesTest)) {
+      return;
+    }
+    NodeBytes nb = new NodeBytes(br.datanode.dnRegistration, offset);
+    try {
+      ((PipelinesTest)pTest).fiCallSetNumBytes.run(nb);
+    } catch (IOException e) {
+      LOG.fatal("FI: no exception is expected here!");
+    }
+  }
+  
+  // Pointcuts and advises for TestFiPipelines  
+  pointcut callSetBytesAcked(PacketResponder pr, long acked) : 
+    call (void ReplicaInPipelineInterface.setBytesAcked(long)) 
+    && withincode (void PacketResponder.run())
+    && args(acked) 
+    && this(pr);
+
+  pointcut callSetBytesAckedLastDN(PacketResponder pr, long acked) : 
+    call (void ReplicaInPipelineInterface.setBytesAcked(long)) 
+    && withincode (void PacketResponder.lastDataNodeRun())
+    && args(acked) 
+    && this(pr);
+  
+  after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+    if (pTest == null) {
+      LOG.debug("FI: no pipeline has been found in acking");
+      return;
+    }
+    LOG.debug("FI: Acked total bytes from: " + 
+        pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
+    if (pTest instanceof PipelinesTest) {
+      bytesAckedService((PipelinesTest)pTest, pr, acked);
+    }
+  }
+  after (PacketResponder pr, long acked) : callSetBytesAckedLastDN (pr, acked) {
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+    if (pTest == null) {
+      LOG.debug("FI: no pipeline has been found in acking");
+      return;
+    }
+    LOG.debug("FI: Acked total bytes from (last DN): " + 
+        pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
+    if (pTest instanceof PipelinesTest) {
+      bytesAckedService((PipelinesTest)pTest, pr, acked); 
+    }
+  }
+  
+  private void bytesAckedService 
+      (final PipelinesTest pTest, final PacketResponder pr, final long acked) {
+    NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);
+    try {
+      pTest.fiCallSetBytesAcked.run(nb);
+    } catch (IOException e) {
+      LOG.fatal("No exception should be happening at this point");
+      assert false;
+    }
+  }
+  
+  pointcut preventAckSending () :
+    call (void ackReply(long)) 
+    && within (PacketResponder);
+
+  static int ackCounter = 0;
+  void around () : preventAckSending () {
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+
+    if (pTest == null) { 
+      LOG.debug("FI: remove first ack as expected");
+      proceed();
+      return;
+    }
+    if (!(pTest instanceof PipelinesTest)) {
+      LOG.debug("FI: remove first ack as expected");
+      proceed();
+      return;
+    }
+    if (((PipelinesTest)pTest).getSuspend()) {
+        LOG.debug("FI: suspend the ack");
+        return;
+    }
+    LOG.debug("FI: remove first ack as expected");
+    proceed();
+  }
+  // End of pointcuts and advises for TestFiPipelines  
+
+  pointcut pipelineClose(BlockReceiver blockreceiver, long offsetInBlock, long seqno,
+      boolean lastPacketInBlock, int len, int endOfHeader) :
+    call (* BlockReceiver.receivePacket(long, long, boolean, int, int))
+      && this(blockreceiver)
+      && args(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
+
+  before(BlockReceiver blockreceiver, long offsetInBlock, long seqno,
+      boolean lastPacketInBlock, int len, int endOfHeader
+      ) throws IOException : pipelineClose(blockreceiver, offsetInBlock, seqno,
+          lastPacketInBlock, len, endOfHeader) {
+    if (len == 0) {
+      final DatanodeRegistration dr = blockreceiver.getDataNode().getDatanodeRegistration();
+      LOG.info("FI: pipelineClose, datanode=" + dr.getName()
+          + ", offsetInBlock=" + offsetInBlock
+          + ", seqno=" + seqno
+          + ", lastPacketInBlock=" + lastPacketInBlock
+          + ", len=" + len
+          + ", endOfHeader=" + endOfHeader);
+  
+      final DataTransferTest test = DataTransferTestUtil.getDataTransferTest();
+      if (test != null) {
+        test.fiPipelineClose.run(dr);
+      }
+    }
+  }
+
+  pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) :
+    call (Status Status.read(DataInput))
+      && this(packetresponder);
+
+  after(BlockReceiver.PacketResponder packetresponder) throws IOException
+      : pipelineAck(packetresponder) {
+    final DatanodeRegistration dr = packetresponder.receiver.getDataNode().getDatanodeRegistration();
+    LOG.info("FI: fiPipelineAck, datanode=" + dr);
+
+    final DataTransferTest test = DataTransferTestUtil.getDataTransferTest();
+    if (test != null) {
+      test.fiPipelineAck.run(dr);
+    }
+  }
+
+  pointcut blockFileClose(BlockReceiver blockreceiver) :
+    call(void close())
+      && withincode(void BlockReceiver.close())
+      && this(blockreceiver);
+
+  after(BlockReceiver blockreceiver) throws IOException : blockFileClose(blockreceiver) {
+    final DatanodeRegistration dr = blockreceiver.getDataNode().getDatanodeRegistration();
+    LOG.info("FI: blockFileClose, datanode=" + dr);
+
+    final DataTransferTest test = DataTransferTestUtil.getDataTransferTest();
+    if (test != null) {
+      test.fiBlockFileClose.run(dr);
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj Sat Nov 28 20:05:56 2009
@@ -60,8 +60,9 @@
     final DataNode d = dataxceiver.getDataNode();
     LOG.info("FI: statusRead " + status + ", datanode="
         + d.getDatanodeRegistration().getName());    
-    DataTransferTestUtil.getDataTransferTest().fiStatusRead.run(
-        d.getDatanodeRegistration());
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null) 
+      dtTest.fiStatusRead.run(d.getDatanodeRegistration());
   }
 
   pointcut receiverOpWriteBlock(DataXceiver dataxceiver):

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj Sat Nov 28 20:05:56 2009
@@ -35,11 +35,9 @@
   // the following will inject faults inside of the method in question 		
     execution (* FSDataset.getBlockFile(..)) && !within(FSDatasetAspects +);
 
-  // the following will inject faults before the actual call of the method
-  // call (* FSDataset.getBlockFile(..)) && !within(FSDatasetAspects +);
-
-  pointcut callCreateBlockWriteStream() : 
-    call (BlockWriteStreams FSDataset.createBlockWriteStreams(..)) 
+  pointcut callCreateBlockWriteStream(ReplicaInPipeline repl) : 
+    call (BlockWriteStreams createStreams(..))
+    && target (repl)
       && !within(FSDatasetAspects +);
 
   // This aspect specifies the logic of our fault point.
@@ -54,7 +52,7 @@
     }
   }
 
-  before() throws DiskOutOfSpaceException : callCreateBlockWriteStream() {
+  before(ReplicaInPipeline repl) throws DiskOutOfSpaceException : callCreateBlockWriteStream(repl) {
     if (ProbabilityModel.injectCriteria(FSDataset.class.getSimpleName())) {
       LOG.info("Before the injection point");
       Thread.dumpStack();



Mime
View raw message