hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [12/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apac...
Date Fri, 19 Oct 2012 02:28:07 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Oct 19 02:25:55 2012
@@ -20,7 +20,7 @@ package org.apache.hadoop.hdfs.server.da
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 import java.io.BufferedInputStream;
@@ -29,6 +29,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
@@ -43,7 +44,10 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -82,9 +86,10 @@ class DataXceiver extends Receiver imple
   private final DataNode datanode;
   private final DNConf dnConf;
   private final DataXceiverServer dataXceiverServer;
-
+  private final boolean connectToDnViaHostname;
   private long opStartTime; //the start time of receiving an Op
-  private final SocketInputWrapper socketInputWrapper;
+  private final SocketInputWrapper socketIn;
+  private OutputStream socketOut;
 
   /**
    * Client Name used in previous operation. Not available on first request
@@ -94,24 +99,21 @@ class DataXceiver extends Receiver imple
   
   public static DataXceiver create(Socket s, DataNode dn,
       DataXceiverServer dataXceiverServer) throws IOException {
-    
-    SocketInputWrapper iw = NetUtils.getInputStream(s);
-    return new DataXceiver(s, iw, dn, dataXceiverServer);
+    return new DataXceiver(s, dn, dataXceiverServer);
   }
   
   private DataXceiver(Socket s, 
-      SocketInputWrapper socketInput,
       DataNode datanode, 
       DataXceiverServer dataXceiverServer) throws IOException {
-    super(new DataInputStream(new BufferedInputStream(
-        socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
 
     this.s = s;
-    this.socketInputWrapper = socketInput;
+    this.dnConf = datanode.getDnConf();
+    this.socketIn = NetUtils.getInputStream(s);
+    this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
     this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
-    this.dnConf = datanode.getDnConf();
     this.dataXceiverServer = dataXceiverServer;
+    this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
 
@@ -141,15 +143,43 @@ class DataXceiver extends Receiver imple
 
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
+  
+  private OutputStream getOutputStream() throws IOException {
+    return socketOut;
+  }
 
   /**
    * Read/write data from/to the DataXceiverServer.
    */
+  @Override
   public void run() {
     int opsProcessed = 0;
     Op op = null;
+    
     dataXceiverServer.childSockets.add(s);
+    
     try {
+      
+      InputStream input = socketIn;
+      if (dnConf.encryptDataTransfer) {
+        IOStreamPair encryptedStreams = null;
+        try {
+          encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
+              socketIn, datanode.blockPoolTokenSecretManager,
+              dnConf.encryptionAlgorithm);
+        } catch (InvalidMagicNumberException imne) {
+          LOG.info("Failed to read expected encryption handshake from client " +
+              "at " + s.getInetAddress() + ". Perhaps the client is running an " +
+              "older version of Hadoop which does not support encryption.");
+          return;
+        }
+        input = encryptedStreams.in;
+        socketOut = encryptedStreams.out;
+      }
+      input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+      
+      super.initialize(new DataInputStream(input));
+      
       // We process requests in a loop, and stay around for a short timeout.
       // This optimistic behaviour allows the other end to reuse connections.
       // Setting keepalive timeout to 0 disable this behavior.
@@ -159,9 +189,9 @@ class DataXceiver extends Receiver imple
         try {
           if (opsProcessed != 0) {
             assert dnConf.socketKeepaliveTimeout > 0;
-            socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout);
+            socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
           } else {
-            socketInputWrapper.setTimeout(dnConf.socketTimeout);
+            socketIn.setTimeout(dnConf.socketTimeout);
           }
           op = readOp();
         } catch (InterruptedIOException ignored) {
@@ -214,8 +244,7 @@ class DataXceiver extends Receiver imple
       final long length) throws IOException {
     previousOpClientName = clientName;
 
-    OutputStream baseStream = NetUtils.getOutputStream(s, 
-        dnConf.socketWriteTimeout);
+    OutputStream baseStream = getOutputStream();
     DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
     checkAccess(out, true, block, blockToken,
@@ -241,13 +270,12 @@ class DataXceiver extends Receiver imple
       } catch(IOException e) {
         String msg = "opReadBlock " + block + " received exception " + e; 
         LOG.info(msg);
-        sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+        sendResponse(ERROR, msg);
         throw e;
       }
       
       // send op status
-      writeSuccessWithChecksumInfo(blockSender,
-          getStreamWithTimeout(s, dnConf.socketWriteTimeout));
+      writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
 
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
@@ -346,7 +374,7 @@ class DataXceiver extends Receiver imple
     // reply to upstream datanode or client 
     final DataOutputStream replyOut = new DataOutputStream(
         new BufferedOutputStream(
-            NetUtils.getOutputStream(s, dnConf.socketWriteTimeout),
+            getOutputStream(),
             HdfsConstants.SMALL_BUFFER_SIZE));
     checkAccess(replyOut, isClient, block, blockToken,
         Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
@@ -377,7 +405,10 @@ class DataXceiver extends Receiver imple
       if (targets.length > 0) {
         InetSocketAddress mirrorTarget = null;
         // Connect to backup machine
-        mirrorNode = targets[0].getXferAddr();
+        mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Connecting to datanode " + mirrorNode);
+        }
         mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
         mirrorSock = datanode.newSocket();
         try {
@@ -388,11 +419,23 @@ class DataXceiver extends Receiver imple
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
           mirrorSock.setSoTimeout(timeoutValue);
           mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-          mirrorOut = new DataOutputStream(
-             new BufferedOutputStream(
-                         NetUtils.getOutputStream(mirrorSock, writeTimeout),
-                         HdfsConstants.SMALL_BUFFER_SIZE));
-          mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
+          
+          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
+              writeTimeout);
+          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
+          if (dnConf.encryptDataTransfer) {
+            IOStreamPair encryptedStreams =
+                DataTransferEncryptor.getEncryptedStreams(
+                    unbufMirrorOut, unbufMirrorIn,
+                    datanode.blockPoolTokenSecretManager
+                        .generateDataEncryptionKey(block.getBlockPoolId()));
+            
+            unbufMirrorOut = encryptedStreams.out;
+            unbufMirrorIn = encryptedStreams.in;
+          }
+          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
+              HdfsConstants.SMALL_BUFFER_SIZE));
+          mirrorIn = new DataInputStream(unbufMirrorIn);
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
               clientname, targets, srcDataNode, stage, pipelineSize,
@@ -418,7 +461,8 @@ class DataXceiver extends Receiver imple
           if (isClient) {
             BlockOpResponseProto.newBuilder()
               .setStatus(ERROR)
-              .setFirstBadLink(mirrorNode)
+               // NB: Unconditionally using the xfer addr w/o hostname
+              .setFirstBadLink(targets[0].getXferAddr())
               .build()
               .writeDelimitedTo(replyOut);
             replyOut.flush();
@@ -519,7 +563,7 @@ class DataXceiver extends Receiver imple
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
 
     final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
+        getOutputStream());
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
       writeResponse(Status.SUCCESS, null, out);
@@ -532,7 +576,7 @@ class DataXceiver extends Receiver imple
   public void blockChecksum(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
+        getOutputStream());
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
     updateCurrentThreadName("Reading metadata for block " + block);
@@ -565,6 +609,7 @@ class DataXceiver extends Receiver imple
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
+          .setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType()))
           )
         .build()
         .writeDelimitedTo(out);
@@ -592,7 +637,7 @@ class DataXceiver extends Receiver imple
         LOG.warn("Invalid access token in request from " + remoteAddress
             + " for OP_COPY_BLOCK for block " + block + " : "
             + e.getLocalizedMessage());
-        sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout);
+        sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
         return;
       }
 
@@ -602,7 +647,7 @@ class DataXceiver extends Receiver imple
       String msg = "Not able to copy block " + block.getBlockId() + " to " 
       + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
       LOG.info(msg);
-      sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+      sendResponse(ERROR, msg);
       return;
     }
 
@@ -616,8 +661,7 @@ class DataXceiver extends Receiver imple
           null);
 
       // set up response stream
-      OutputStream baseStream = NetUtils.getOutputStream(
-          s, dnConf.socketWriteTimeout);
+      OutputStream baseStream = getOutputStream();
       reply = new DataOutputStream(new BufferedOutputStream(
           baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
 
@@ -669,8 +713,7 @@ class DataXceiver extends Receiver imple
         LOG.warn("Invalid access token in request from " + remoteAddress
             + " for OP_REPLACE_BLOCK for block " + block + " : "
             + e.getLocalizedMessage());
-        sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
-            dnConf.socketWriteTimeout);
+        sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
         return;
       }
     }
@@ -679,7 +722,7 @@ class DataXceiver extends Receiver imple
       String msg = "Not able to receive block " + block.getBlockId() + " from " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
       LOG.warn(msg);
-      sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+      sendResponse(ERROR, msg);
       return;
     }
 
@@ -692,23 +735,38 @@ class DataXceiver extends Receiver imple
     
     try {
       // get the output stream to the proxy
-      InetSocketAddress proxyAddr =
-        NetUtils.createSocketAddr(proxySource.getXferAddr());
+      final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Connecting to datanode " + dnAddr);
+      }
+      InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
       proxySock = datanode.newSocket();
       NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
       proxySock.setSoTimeout(dnConf.socketTimeout);
 
-      OutputStream baseStream = NetUtils.getOutputStream(proxySock, 
+      OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
           dnConf.socketWriteTimeout);
-      proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
+      InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
+      if (dnConf.encryptDataTransfer) {
+        IOStreamPair encryptedStreams =
+            DataTransferEncryptor.getEncryptedStreams(
+                unbufProxyOut, unbufProxyIn,
+                datanode.blockPoolTokenSecretManager
+                    .generateDataEncryptionKey(block.getBlockPoolId()));
+        unbufProxyOut = encryptedStreams.out;
+        unbufProxyIn = encryptedStreams.in;
+      }
+      
+      proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
           HdfsConstants.SMALL_BUFFER_SIZE));
+      proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
+          HdfsConstants.IO_FILE_BUFFER_SIZE));
 
       /* send request to the proxy */
       new Sender(proxyOut).copyBlock(block, blockToken);
 
       // receive the response from the proxy
-      proxyReply = new DataInputStream(new BufferedInputStream(
-          NetUtils.getInputStream(proxySock), HdfsConstants.IO_FILE_BUFFER_SIZE));
+      
       BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
           HdfsProtoUtil.vintPrefixed(proxyReply));
 
@@ -761,7 +819,7 @@ class DataXceiver extends Receiver imple
       
       // send response back
       try {
-        sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout);
+        sendResponse(opStatus, errMsg);
       } catch (IOException ioe) {
         LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
       }
@@ -780,20 +838,13 @@ class DataXceiver extends Receiver imple
 
   /**
    * Utility function for sending a response.
-   * @param s socket to write to
+   * 
    * @param opStatus status message to write
-   * @param timeout send timeout
-   **/
-  private static void sendResponse(Socket s, Status status, String message,
-      long timeout) throws IOException {
-    DataOutputStream reply = getStreamWithTimeout(s, timeout);
-    
-    writeResponse(status, message, reply);
-  }
-  
-  private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
-      throws IOException {
-    return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+   * @param message message to send to the client or other DN
+   */
+  private void sendResponse(Status status,
+      String message) throws IOException {
+    writeResponse(status, message, getOutputStream());
   }
 
   private static void writeResponse(Status status, String message, OutputStream out)
@@ -849,6 +900,7 @@ class DataXceiver extends Receiver imple
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
               DatanodeRegistration dnR = 
                 datanode.getDNRegistrationForBP(blk.getBlockPoolId());
+              // NB: Unconditionally using the xfer addr w/o hostname
               resp.setFirstBadLink(dnR.getXferAddr());
             }
             resp.build().writeDelimitedTo(out);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Fri Oct 19 02:25:55 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.net.URLEncoder;
@@ -37,7 +36,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -45,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -60,6 +59,7 @@ public class DatanodeJspHelper {
                                                  InterruptedException {
     return
       user.doAs(new PrivilegedExceptionAction<DFSClient>() {
+        @Override
         public DFSClient run() throws IOException {
           return new DFSClient(NetUtils.createSocketAddr(addr), conf);
         }
@@ -139,7 +139,7 @@ public class DatanodeJspHelper {
           DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock, conf);
           String fqdn = canonicalize(chosenNode.getIpAddr());
           int datanodePort = chosenNode.getXferPort();
-          String redirectLocation = "http://" + fqdn + ":"
+          String redirectLocation = HttpConfig.getSchemePrefix() + fqdn + ":"
               + chosenNode.getInfoPort() + "/browseBlock.jsp?blockId="
               + firstBlock.getBlock().getBlockId() + "&blockSize="
               + firstBlock.getBlock().getNumBytes() + "&genstamp="
@@ -219,7 +219,7 @@ public class DatanodeJspHelper {
         JspHelper.addTableFooter(out);
       }
     }
-    out.print("<br><a href=\"http://"
+    out.print("<br><a href=\"" + HttpConfig.getSchemePrefix()
         + canonicalize(nnAddr) + ":"
         + namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
     dfs.close();
@@ -295,7 +295,7 @@ public class DatanodeJspHelper {
         Long.MAX_VALUE).getLocatedBlocks();
     // Add the various links for looking at the file contents
     // URL for downloading the full file
-    String downloadUrl = "http://" + req.getServerName() + ":"
+    String downloadUrl = HttpConfig.getSchemePrefix() + req.getServerName() + ":"
         + req.getServerPort() + "/streamFile" + ServletUtil.encodePath(filename)
         + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr, true)
         + JspHelper.getDelegationTokenUrlParam(tokenString);
@@ -313,7 +313,7 @@ public class DatanodeJspHelper {
       return;
     }
     String fqdn = canonicalize(chosenNode.getIpAddr());
-    String tailUrl = "http://" + fqdn + ":" + chosenNode.getInfoPort()
+    String tailUrl = HttpConfig.getSchemePrefix() + fqdn + ":" + chosenNode.getInfoPort()
         + "/tail.jsp?filename=" + URLEncoder.encode(filename, "UTF-8")
         + "&namenodeInfoPort=" + namenodeInfoPort
         + "&chunkSizeToView=" + chunkSizeToView
@@ -362,7 +362,7 @@ public class DatanodeJspHelper {
         String datanodeAddr = locs[j].getXferAddr();
         datanodePort = locs[j].getXferPort();
         fqdn = canonicalize(locs[j].getIpAddr());
-        String blockUrl = "http://" + fqdn + ":" + locs[j].getInfoPort()
+        String blockUrl = HttpConfig.getSchemePrefix() + fqdn + ":" + locs[j].getInfoPort()
             + "/browseBlock.jsp?blockId=" + blockidstring
             + "&blockSize=" + blockSize
             + "&filename=" + URLEncoder.encode(filename, "UTF-8")
@@ -373,7 +373,7 @@ public class DatanodeJspHelper {
             + JspHelper.getDelegationTokenUrlParam(tokenString)
             + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
 
-        String blockInfoUrl = "http://" + nnCanonicalName + ":"
+        String blockInfoUrl = HttpConfig.getSchemePrefix() + nnCanonicalName + ":"
             + namenodeInfoPort
             + "/block_info_xml.jsp?blockId=" + blockidstring;
         out.print("<td>&nbsp</td><td><a href=\"" + blockUrl + "\">"
@@ -384,7 +384,7 @@ public class DatanodeJspHelper {
     }
     out.println("</table>");
     out.print("<hr>");
-    out.print("<br><a href=\"http://"
+    out.print("<br><a href=\"" + HttpConfig.getSchemePrefix()
         + nnCanonicalName + ":"
         + namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
     dfs.close();
@@ -484,7 +484,7 @@ public class DatanodeJspHelper {
     String parent = new File(filename).getParent();
     JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, parent, nnAddr);
     out.print("<hr>");
-    out.print("<a href=\"http://"
+    out.print("<a href=\"" + HttpConfig.getSchemePrefix()
         + req.getServerName() + ":" + req.getServerPort()
         + "/browseDirectory.jsp?dir=" + URLEncoder.encode(parent, "UTF-8")
         + "&namenodeInfoPort=" + namenodeInfoPort
@@ -532,7 +532,7 @@ public class DatanodeJspHelper {
     }
     String nextUrl = null;
     if (nextBlockIdStr != null) {
-      nextUrl = "http://" + canonicalize(nextHost) + ":" + nextPort
+      nextUrl = HttpConfig.getSchemePrefix() + canonicalize(nextHost) + ":" + nextPort
           + "/browseBlock.jsp?blockId=" + nextBlockIdStr
           + "&blockSize=" + nextBlockSize
           + "&startOffset=" + nextStartOffset
@@ -587,7 +587,7 @@ public class DatanodeJspHelper {
 
     String prevUrl = null;
     if (prevBlockIdStr != null) {
-      prevUrl = "http://" + canonicalize(prevHost) + ":" + prevPort
+      prevUrl = HttpConfig.getSchemePrefix() + canonicalize(prevHost) + ":" + prevPort
           + "/browseBlock.jsp?blockId=" + prevBlockIdStr
           + "&blockSize=" + prevBlockSize
           + "&startOffset=" + prevStartOffset
@@ -605,7 +605,7 @@ public class DatanodeJspHelper {
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
           datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
-          startOffset, chunkSizeToView, out, conf);
+          startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
     } catch (Exception e) {
       out.print(e);
     }
@@ -698,7 +698,7 @@ public class DatanodeJspHelper {
 
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
-        blockSize, startOffset, chunkSizeToView, out, conf);
+        blockSize, startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
     out.print("</textarea>");
     dfs.close();
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Fri Oct 19 02:25:55 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
 
 /**
  * Periodically scans the data directories for block and block metadata files.
@@ -55,7 +56,6 @@ import org.apache.hadoop.util.Daemon;
 public class DirectoryScanner implements Runnable {
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
 
-  private final DataNode datanode;
   private final FsDatasetSpi<?> dataset;
   private final ExecutorService reportCompileThreadPool;
   private final ScheduledExecutorService masterThread;
@@ -87,6 +87,7 @@ public class DirectoryScanner implements
       this.bpid = bpid;
     }
     
+    @Override
     public String toString() {
       return "BlockPool " + bpid
       + " Total blocks: " + totalBlocks + ", missing metadata files:"
@@ -220,8 +221,7 @@ public class DirectoryScanner implements
     }
   }
 
-  DirectoryScanner(DataNode dn, FsDatasetSpi<?> dataset, Configuration conf) {
-    this.datanode = dn;
+  DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
     this.dataset = dataset;
     int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
@@ -239,7 +239,7 @@ public class DirectoryScanner implements
   void start() {
     shouldRun = true;
     long offset = DFSUtil.getRandom().nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec
-    long firstScanTime = System.currentTimeMillis() + offset;
+    long firstScanTime = Time.now() + offset;
     LOG.info("Periodic Directory Tree Verification scan starting at " 
         + firstScanTime + " with interval " + scanPeriodMsecs);
     masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs, 
@@ -269,17 +269,6 @@ public class DirectoryScanner implements
         return;
       }
 
-      String[] bpids = dataset.getBlockPoolList();
-      for(String bpid : bpids) {
-        UpgradeManagerDatanode um = 
-          datanode.getUpgradeManagerDatanode(bpid);
-        if (um != null && !um.isUpgradeCompleted()) {
-          //If distributed upgrades underway, exit and wait for next cycle.
-          LOG.warn("this cycle terminating immediately because Distributed Upgrade is in process");
-          return; 
-        }
-      }
-      
       //We're are okay to run - do it
       reconcile();      
       
@@ -442,16 +431,16 @@ public class DirectoryScanner implements
   private Map<String, ScanInfo[]> getDiskReport() {
     // First get list of data directories
     final List<? extends FsVolumeSpi> volumes = dataset.getVolumes();
-    ArrayList<ScanInfoPerBlockPool> dirReports =
-      new ArrayList<ScanInfoPerBlockPool>(volumes.size());
-    
+
+    // Use an array since the threads may return out of order and
+    // compilersInProgress#keySet may return out of order as well.
+    ScanInfoPerBlockPool[] dirReports = new ScanInfoPerBlockPool[volumes.size()];
+
     Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
       new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
+
     for (int i = 0; i < volumes.size(); i++) {
-      if (!isValid(dataset, volumes.get(i))) {
-        // volume is invalid
-        dirReports.add(i, null);
-      } else {
+      if (isValid(dataset, volumes.get(i))) {
         ReportCompiler reportCompiler =
           new ReportCompiler(volumes.get(i));
         Future<ScanInfoPerBlockPool> result = 
@@ -463,7 +452,7 @@ public class DirectoryScanner implements
     for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
         compilersInProgress.entrySet()) {
       try {
-        dirReports.add(report.getKey(), report.getValue().get());
+        dirReports[report.getKey()] = report.getValue().get();
       } catch (Exception ex) {
         LOG.error("Error compiling report", ex);
         // Propagate ex to DataBlockScanner to deal with
@@ -476,7 +465,7 @@ public class DirectoryScanner implements
     for (int i = 0; i < volumes.size(); i++) {
       if (isValid(dataset, volumes.get(i))) {
         // volume is still valid
-        list.addAll(dirReports.get(i));
+        list.addAll(dirReports[i]);
       }
     }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java Fri Oct 19 02:25:55 2012
@@ -16,11 +16,11 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
-
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.channels.ServerSocketChannel;
+import java.security.GeneralSecurityException;
 
 import org.apache.commons.daemon.Daemon;
 import org.apache.commons.daemon.DaemonContext;
@@ -28,9 +28,15 @@ import org.apache.hadoop.conf.Configurat
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+
+import javax.net.ssl.SSLServerSocketFactory;
 
 /**
  * Utility class to start a datanode in a secure cluster, first obtaining 
@@ -42,9 +48,9 @@ public class SecureDataNodeStarter imple
    */
   public static class SecureResources {
     private final ServerSocket streamingSocket;
-    private final SelectChannelConnector listener;
+    private final Connector listener;
     public SecureResources(ServerSocket streamingSocket,
-        SelectChannelConnector listener) {
+        Connector listener) {
 
       this.streamingSocket = streamingSocket;
       this.listener = listener;
@@ -52,12 +58,13 @@ public class SecureDataNodeStarter imple
 
     public ServerSocket getStreamingSocket() { return streamingSocket; }
 
-    public SelectChannelConnector getListener() { return listener; }
+    public Connector getListener() { return listener; }
   }
   
   private String [] args;
   private SecureResources resources;
-  
+  private SSLFactory sslFactory;
+
   @Override
   public void init(DaemonContext context) throws Exception {
     System.err.println("Initializing secure datanode resources");
@@ -82,13 +89,30 @@ public class SecureDataNodeStarter imple
     }
 
     // Obtain secure listener for web server
-    SelectChannelConnector listener = 
-                   (SelectChannelConnector)HttpServer.createDefaultChannelConnector();
+    Connector listener;
+    if (HttpConfig.isSecure()) {
+      sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+      try {
+        sslFactory.init();
+      } catch (GeneralSecurityException ex) {
+        throw new IOException(ex);
+      }
+      SslSocketConnector sslListener = new SslSocketConnector() {
+        @Override
+        protected SSLServerSocketFactory createFactory() throws Exception {
+          return sslFactory.createSSLServerSocketFactory();
+        }
+      };
+      listener = sslListener;
+    } else {
+      listener = HttpServer.createDefaultChannelConnector();
+    }
+
     InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
     listener.setHost(infoSocAddr.getHostName());
     listener.setPort(infoSocAddr.getPort());
     // Open listener here in order to bind to port as root
-    listener.open(); 
+    listener.open();
     if (listener.getPort() != infoSocAddr.getPort()) {
       throw new RuntimeException("Unable to bind on specified info port in secure " +
           "context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
@@ -111,6 +135,9 @@ public class SecureDataNodeStarter imple
     DataNode.secureMain(args, resources);
   }
   
-  @Override public void destroy() { /* Nothing to do */ }
+  @Override public void destroy() {
+    sslFactory.destroy();
+  }
+
   @Override public void stop() throws Exception { /* Nothing to do */ }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Fri Oct 19 02:25:55 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
@@ -373,4 +374,16 @@ public interface FsDatasetSpi<V extends 
    */
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b
       ) throws IOException;
+
+  /**
+   * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in 
+   * <code>blocks</code>.
+   * 
+   * @param blocks List of blocks for which to return metadata
+   * @return metadata Metadata for the list of blocks
+   * @throws IOException
+   */
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
+      throws IOException;
+
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Oct 19 02:25:55 2012
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -78,6 +80,7 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
 
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
@@ -263,6 +266,7 @@ class FsDatasetImpl implements FsDataset
   /**
    * Return the number of failed volumes in the FSDataset.
    */
+  @Override
   public int getNumFailedVolumes() {
     return volumes.numberOfFailedVolumes();
   }
@@ -1142,7 +1146,7 @@ class FsDatasetImpl implements FsDataset
     }
     
     // Otherwise remove blocks for the failed volumes
-    long mlsec = System.currentTimeMillis();
+    long mlsec = Time.now();
     synchronized (this) {
       for (FsVolumeImpl fv: failedVols) {
         for (String bpid : fv.getBlockPoolList()) {
@@ -1161,7 +1165,7 @@ class FsDatasetImpl implements FsDataset
         }
       }
     } // end of sync
-    mlsec = System.currentTimeMillis() - mlsec;
+    mlsec = Time.now() - mlsec;
     LOG.warn("Removed " + removedBlocks + " out of " + totalBlocks +
         "(took " + mlsec + " millisecs)");
 
@@ -1665,6 +1669,43 @@ class FsDatasetImpl implements FsDataset
         datafile.getAbsolutePath(), metafile.getAbsolutePath());
     return info;
   }
+  
+  @Override // FsDatasetSpi
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
+      throws IOException {
+    // List of VolumeIds, one per volume on the datanode
+    List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
+    // List of indexes into the list of VolumeIds, pointing at the VolumeId of
+    // the volume that the block is on
+    List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size());
+    // Initialize the list of VolumeIds simply by enumerating the volumes
+    for (int i = 0; i < volumes.volumes.size(); i++) {
+      blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
+    }
+    // Determine the index of the VolumeId of each block's volume, by comparing 
+    // the block's volume against the enumerated volumes
+    for (int i = 0; i < blocks.size(); i++) {
+      ExtendedBlock block = blocks.get(i);
+      FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
+      boolean isValid = false;
+      int volumeIndex = 0;
+      for (FsVolumeImpl volume : volumes.volumes) {
+        // This comparison of references should be safe
+        if (blockVolume == volume) {
+          isValid = true;
+          break;
+        }
+        volumeIndex++;
+      }
+      // Indicates that the block is not present, or not found in a data dir
+      if (!isValid) {
+        volumeIndex = Integer.MAX_VALUE;
+      }
+      blocksVolumeIndexes.add(volumeIndex);
+    }
+    return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
+        blocksVolumeIds, blocksVolumeIndexes);
+  }
 
   @Override
   public RollingLogs createRollingLogs(String bpid, String prefix

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Fri Oct 19 02:25:55 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.metrics2.annota
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
@@ -74,19 +75,54 @@ public class DataNodeMetrics {
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
   @Metric MutableRate packetAckRoundTripTimeNanos;
-
+  MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
+  
   @Metric MutableRate flushNanos;
+  MutableQuantiles[] flushNanosQuantiles;
+  
   @Metric MutableRate fsyncNanos;
+  MutableQuantiles[] fsyncNanosQuantiles;
   
   @Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
+  MutableQuantiles[] sendDataPacketBlockedOnNetworkNanosQuantiles;
   @Metric MutableRate sendDataPacketTransferNanos;
+  MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
+  
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
 
-  public DataNodeMetrics(String name, String sessionId) {
+  public DataNodeMetrics(String name, String sessionId, int[] intervals) {
     this.name = name;
     registry.tag(SessionId, sessionId);
+    
+    final int len = intervals.length;
+    packetAckRoundTripTimeNanosQuantiles = new MutableQuantiles[len];
+    flushNanosQuantiles = new MutableQuantiles[len];
+    fsyncNanosQuantiles = new MutableQuantiles[len];
+    sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
+    sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
+    
+    for (int i = 0; i < len; i++) {
+      int interval = intervals[i];
+      packetAckRoundTripTimeNanosQuantiles[i] = registry.newQuantiles(
+          "packetAckRoundTripTimeNanos" + interval + "s",
+          "Packet Ack RTT in ns", "ops", "latency", interval);
+      flushNanosQuantiles[i] = registry.newQuantiles(
+          "flushNanos" + interval + "s", 
+          "Disk flush latency in ns", "ops", "latency", interval);
+      fsyncNanosQuantiles[i] = registry.newQuantiles(
+          "fsyncNanos" + interval + "s", "Disk fsync latency in ns", 
+          "ops", "latency", interval);
+      sendDataPacketBlockedOnNetworkNanosQuantiles[i] = registry.newQuantiles(
+          "sendDataPacketBlockedOnNetworkNanos" + interval + "s", 
+          "Time blocked on network while sending a packet in ns",
+          "ops", "latency", interval);
+      sendDataPacketTransferNanosQuantiles[i] = registry.newQuantiles(
+          "sendDataPacketTransferNanos" + interval + "s", 
+          "Time reading from disk and writing to network while sending " +
+          "a packet in ns", "ops", "latency", interval);
+    }
   }
 
   public static DataNodeMetrics create(Configuration conf, String dnName) {
@@ -94,8 +130,15 @@ public class DataNodeMetrics {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     JvmMetrics.create("DataNode", sessionId, ms);
     String name = "DataNodeActivity-"+ (dnName.isEmpty()
-        ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() : dnName.replace(':', '-'));
-    return ms.register(name, null, new DataNodeMetrics(name, sessionId));
+        ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() 
+            : dnName.replace(':', '-'));
+
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals = 
+        conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    
+    return ms.register(name, null, new DataNodeMetrics(name, sessionId,
+        intervals));
   }
 
   public String name() { return name; }
@@ -166,14 +209,23 @@ public class DataNodeMetrics {
 
   public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
     packetAckRoundTripTimeNanos.add(latencyNanos);
+    for (MutableQuantiles q : packetAckRoundTripTimeNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void addFlushNanos(long latencyNanos) {
     flushNanos.add(latencyNanos);
+    for (MutableQuantiles q : flushNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void addFsyncNanos(long latencyNanos) {
     fsyncNanos.add(latencyNanos);
+    for (MutableQuantiles q : fsyncNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void shutdown() {
@@ -196,12 +248,18 @@ public class DataNodeMetrics {
   public void incrBlocksGetLocalPathInfo() {
     blocksGetLocalPathInfo.incr();
   }
-  
-  public MutableRate getSendDataPacketBlockedOnNetworkNanos() {
-    return sendDataPacketBlockedOnNetworkNanos;
+
+  public void addSendDataPacketBlockedOnNetworkNanos(long latencyNanos) {
+    sendDataPacketBlockedOnNetworkNanos.add(latencyNanos);
+    for (MutableQuantiles q : sendDataPacketBlockedOnNetworkNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
-  
-  public MutableRate getSendDataPacketTransferNanos() {
-    return sendDataPacketTransferNanos;
+
+  public void addSendDataPacketTransferNanos(long latencyNanos) {
+    sendDataPacketTransferNanos.add(latencyNanos);
+    for (MutableQuantiles q : sendDataPacketTransferNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Fri Oct 19 02:25:55 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -40,7 +39,6 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -217,7 +215,7 @@ public class DatanodeWebHdfsMethods {
             fullpath, permission.getFsPermission(), 
             overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
                 : EnumSet.of(CreateFlag.CREATE),
-            replication.getValue(conf), blockSize.getValue(conf), null, b), null);
+            replication.getValue(conf), blockSize.getValue(conf), null, b, null), null);
         IOUtils.copyBytes(in, out, b);
         out.close();
         out = null;
@@ -411,31 +409,10 @@ public class DatanodeWebHdfsMethods {
         IOUtils.cleanup(LOG, dfsclient);
         throw ioe;
       }
-      final HdfsDataInputStream dis = in;
-      final StreamingOutput streaming = new StreamingOutput() {
-        @Override
-        public void write(final OutputStream out) throws IOException {
-          final Long n = length.getValue();
-          HdfsDataInputStream dfsin = dis;
-          DFSClient client = dfsclient;
-          try {
-            if (n == null) {
-              IOUtils.copyBytes(dfsin, out, b);
-            } else {
-              IOUtils.copyBytes(dfsin, out, n, false);
-            }
-            dfsin.close();
-            dfsin = null;
-            dfsclient.close();
-            client = null;
-          } finally {
-            IOUtils.cleanup(LOG, dfsin);
-            IOUtils.cleanup(LOG, client);
-          }
-        }
-      };
-
-      return Response.ok(streaming).type(
+      
+      final long n = length.getValue() != null? length.getValue()
+        : in.getVisibleLength() - offset.getValue();
+      return Response.ok(new OpenEntity(in, n, dfsclient)).type(
           MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java Fri Oct 19 02:25:55 2012
@@ -283,8 +283,9 @@ public class JournalService implements J
         new JournalProtocolServerSideTranslatorPB(impl);
     BlockingService service = 
         JournalProtocolService.newReflectiveBlockingService(xlator);
-    return RPC.getServer(JournalProtocolPB.class, service,
-        address.getHostName(), address.getPort(), 1, false, conf, null);
+    return new RPC.Builder(conf).setProtocol(JournalProtocolPB.class)
+        .setInstance(service).setBindAddress(address.getHostName())
+        .setPort(address.getPort()).setNumHandlers(1).setVerbose(false).build();
   }
   
   private void verifyEpoch(long e) throws FencedException {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Fri Oct 19 02:25:55 2012
@@ -22,6 +22,7 @@ import java.util.Collection;
 
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 
 /**
  * A JournalManager implementation that uses RPCs to log transactions
@@ -39,6 +40,20 @@ class BackupJournalManager implements Jo
   }
 
   @Override
+  public void format(NamespaceInfo nsInfo) {
+    // format() should only get called at startup, before any BNs
+    // can register with the NN.
+    throw new UnsupportedOperationException(
+        "BackupNode journal should never get formatted");
+  }
+  
+  @Override
+  public boolean hasSomeData() {
+    throw new UnsupportedOperationException();
+  }
+
+  
+  @Override
   public EditLogOutputStream startLogSegment(long txId) throws IOException {
     EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
         journalInfo);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Fri Oct 19 02:25:55 2012
@@ -364,6 +364,7 @@ public class BackupNode extends NameNode
     } else {
       nsInfo.validateStorage(storage);
     }
+    bnImage.initEditLog();
     setRegistration();
     NamenodeRegistration nnReg = null;
     while(!isStopRequested()) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java Fri Oct 19 02:25:55 2012
@@ -68,6 +68,7 @@ public class CancelDelegationTokenServle
     
     try {
       ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
         public Void run() throws Exception {
           nn.getRpcServer().cancelDelegationToken(token);
           return null;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Fri Oct 19 02:25:55 2012
@@ -72,6 +72,7 @@ public class CheckpointSignature extends
    * Get the cluster id from CheckpointSignature
    * @return the cluster id
    */
+  @Override
   public String getClusterID() {
     return clusterID;
   }
@@ -101,6 +102,7 @@ public class CheckpointSignature extends
     this.blockpoolID = blockpoolID;
   }
   
+  @Override
   public String toString() {
     return String.valueOf(layoutVersion) + FIELD_SEPARATOR
          + String.valueOf(namespaceID) + FIELD_SEPARATOR
@@ -111,12 +113,19 @@ public class CheckpointSignature extends
          + blockpoolID ;
   }
 
+  boolean storageVersionMatches(StorageInfo si) throws IOException {
+    return (layoutVersion == si.layoutVersion) && (cTime == si.cTime);
+  }
+
+  boolean isSameCluster(FSImage si) {
+    return namespaceID == si.getStorage().namespaceID &&
+      clusterID.equals(si.getClusterID()) &&
+      blockpoolID.equals(si.getBlockPoolID());
+  }
+
   void validateStorageInfo(FSImage si) throws IOException {
-    if(layoutVersion != si.getStorage().layoutVersion
-       || namespaceID != si.getStorage().namespaceID 
-       || cTime != si.getStorage().cTime
-       || !clusterID.equals(si.getClusterID())
-       || !blockpoolID.equals(si.getBlockPoolID())) {
+    if (!isSameCluster(si)
+        || !storageVersionMatches(si.getStorage())) {
       throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
           + " cTime = " + cTime
@@ -133,6 +142,7 @@ public class CheckpointSignature extends
   //
   // Comparable interface
   //
+  @Override
   public int compareTo(CheckpointSignature o) {
     return ComparisonChain.start()
       .compare(layoutVersion, o.layoutVersion)
@@ -145,6 +155,7 @@ public class CheckpointSignature extends
       .result();
   }
 
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof CheckpointSignature)) {
       return false;
@@ -152,6 +163,7 @@ public class CheckpointSignature extends
     return compareTo((CheckpointSignature)o) == 0;
   }
 
+  @Override
   public int hashCode() {
     return layoutVersion ^ namespaceID ^
             (int)(cTime ^ mostRecentCheckpointTxId ^ curSegmentTxId)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Fri Oct 19 02:25:55 2012
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
 
 import java.io.File;
 import java.io.IOException;
@@ -118,6 +118,7 @@ class Checkpointer extends Daemon {
   //
   // The main work loop
   //
+  @Override
   public void run() {
     // Check the size of the edit log once every 5 minutes.
     long periodMSec = 5 * 60;   // 5 minutes

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Fri Oct 19 02:25:55 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -656,6 +657,7 @@ class ClusterJspHelper {
       this.value = v;
     }
 
+    @Override
     public String toString() {
       return value;
     }
@@ -822,7 +824,7 @@ class ClusterJspHelper {
     doc.startTag("item");
     doc.attribute("label", label);
     doc.attribute("value", value);
-    doc.attribute("link", "http://" + url);
+    doc.attribute("link", HttpConfig.getSchemePrefix() + url);
     doc.endTag(); // item
   }
 
@@ -882,7 +884,7 @@ class ClusterJspHelper {
 
   private static String queryMbean(String httpAddress, Configuration conf) 
     throws IOException {
-    URL url = new URL("http://"+httpAddress+JMX_QRY);
+    URL url = new URL(HttpConfig.getSchemePrefix() + httpAddress+JMX_QRY);
     return readOutput(url);
   }
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Fri Oct 19 02:25:55 2012
@@ -114,7 +114,7 @@ class EditLogBackupOutputStream extends 
   }
 
   @Override // EditLogOutputStream
-  protected void flushAndSync() throws IOException {
+  protected void flushAndSync(boolean durable) throws IOException {
     assert out.getLength() == 0 : "Output buffer is not empty";
     
     int numReadyTxns = doubleBuf.countReadyTxns();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Fri Oct 19 02:25:55 2012
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -367,30 +368,36 @@ public class EditLogFileInputStream exte
 
     @Override
     public InputStream getInputStream() throws IOException {
-      HttpURLConnection connection = (HttpURLConnection)
-          SecurityUtil.openSecureHttpConnection(url);
-      
-      if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
-        throw new HttpGetFailedException(
-            "Fetch of " + url +
-            " failed with status code " + connection.getResponseCode() +
-            "\nResponse message:\n" + connection.getResponseMessage(),
-            connection);
-      }
-
-      String contentLength = connection.getHeaderField(CONTENT_LENGTH);
-      if (contentLength != null) {
-        advertisedSize = Long.parseLong(contentLength);
-        if (advertisedSize <= 0) {
-          throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
-              contentLength);
-        }
-      } else {
-        throw new IOException(CONTENT_LENGTH + " header is not provided " +
-                              "by the server when trying to fetch " + url);
-      }
-
-      return connection.getInputStream();
+      return SecurityUtil.doAsCurrentUser(
+          new PrivilegedExceptionAction<InputStream>() {
+            @Override
+            public InputStream run() throws IOException {
+              HttpURLConnection connection = (HttpURLConnection)
+                  SecurityUtil.openSecureHttpConnection(url);
+              
+              if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+                throw new HttpGetFailedException(
+                    "Fetch of " + url +
+                    " failed with status code " + connection.getResponseCode() +
+                    "\nResponse message:\n" + connection.getResponseMessage(),
+                    connection);
+              }
+        
+              String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+              if (contentLength != null) {
+                advertisedSize = Long.parseLong(contentLength);
+                if (advertisedSize <= 0) {
+                  throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
+                      contentLength);
+                }
+              } else {
+                throw new IOException(CONTENT_LENGTH + " header is not provided " +
+                                      "by the server when trying to fetch " + url);
+              }
+        
+              return connection.getInputStream();
+            }
+          });
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Oct 19 02:25:55 2012
@@ -49,6 +49,8 @@ public class EditLogFileOutputStream ext
   private EditsDoubleBuffer doubleBuf;
   static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
 
+  private static boolean shouldSkipFsyncForTests = false;
+
   static {
     fill.position(0);
     for (int i = 0; i < fill.capacity(); i++) {
@@ -174,7 +176,7 @@ public class EditLogFileOutputStream ext
    * accumulates new log records while readyBuffer will be flushed and synced.
    */
   @Override
-  public void flushAndSync() throws IOException {
+  public void flushAndSync(boolean durable) throws IOException {
     if (fp == null) {
       throw new IOException("Trying to use aborted output stream");
     }
@@ -184,7 +186,9 @@ public class EditLogFileOutputStream ext
     }
     preallocate(); // preallocate file if necessay
     doubleBuf.flushTo(fp);
-    fc.force(false); // metadata updates not needed
+    if (durable && !shouldSkipFsyncForTests) {
+      fc.force(false); // metadata updates not needed
+    }
   }
 
   /**
@@ -247,4 +251,15 @@ public class EditLogFileOutputStream ext
   public FileChannel getFileChannelForTesting() {
     return fc;
   }
+  
+  /**
+   * For the purposes of unit tests, we don't need to actually
+   * write durably to disk. So, we can skip the fsync() calls
+   * for a speed improvement.
+   * @param skip true if fsync should <em>not</em> be called
+   */
+  @VisibleForTesting
+  public static void setShouldSkipFsyncForTesting(boolean skip) {
+    shouldSkipFsyncForTests = skip;
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Fri Oct 19 02:25:55 2012
@@ -22,9 +22,6 @@ import org.apache.hadoop.classification.
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
 /**
  * A generic abstract class to support reading edits log data from 
  * persistent storage.
@@ -57,6 +54,7 @@ public abstract class EditLogInputStream
    * Close the stream.
    * @throws IOException if an error occurred while closing
    */
+  @Override
   public abstract void close() throws IOException;
 
   /** 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri Oct 19 02:25:55 2012
@@ -20,10 +20,11 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.Closeable;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.jasper.compiler.JspUtil;
 
 /**
  * A generic abstract class to support journaling of edits logs into 
@@ -74,6 +75,7 @@ public abstract class EditLogOutputStrea
    * @throws IOException if the journal can't be closed,
    *         or if there are unflushed edits
    */
+  @Override
   abstract public void close() throws IOException;
 
   /**
@@ -91,18 +93,24 @@ public abstract class EditLogOutputStrea
   /**
    * Flush and sync all data that is ready to be flush 
    * {@link #setReadyToFlush()} into underlying persistent store.
+   * @param durable if true, the edits should be made truly durable before
+   * returning
    * @throws IOException
    */
-  abstract protected void flushAndSync() throws IOException;
+  abstract protected void flushAndSync(boolean durable) throws IOException;
 
   /**
    * Flush data to persistent store.
    * Collect sync metrics.
    */
   public void flush() throws IOException {
+    flush(true);
+  }
+  
+  public void flush(boolean durable) throws IOException {
     numSync++;
     long start = now();
-    flushAndSync();
+    flushAndSync(durable);
     long end = now();
     totalTimeSync += (end - start);
   }
@@ -131,4 +139,12 @@ public abstract class EditLogOutputStrea
   protected long getNumSync() {
     return numSync;
   }
+
+  /**
+   * @return a short HTML snippet suitable for describing the current
+   * status of the stream
+   */
+  public String generateHtmlReport() {
+    return JspUtil.escapeXml(this.toString());
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Fri Oct 19 02:25:55 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -34,7 +35,8 @@ import com.google.common.base.Preconditi
  * to progress concurrently to flushes without allocating new buffers each
  * time.
  */
-class EditsDoubleBuffer {
+@InterfaceAudience.Private
+public class EditsDoubleBuffer {
 
   private TxnBuffer bufCurrent; // current buffer for writing
   private TxnBuffer bufReady; // buffer ready for flushing
@@ -51,11 +53,11 @@ class EditsDoubleBuffer {
     bufCurrent.writeOp(op);
   }
 
-  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+  public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
     bufCurrent.write(bytes, offset, length);
   }
   
-  void close() throws IOException {
+  public void close() throws IOException {
     Preconditions.checkNotNull(bufCurrent);
     Preconditions.checkNotNull(bufReady);
 
@@ -69,7 +71,7 @@ class EditsDoubleBuffer {
     bufCurrent = bufReady = null;
   }
   
-  void setReadyToFlush() {
+  public void setReadyToFlush() {
     assert isFlushed() : "previous data not flushed yet";
     TxnBuffer tmp = bufReady;
     bufReady = bufCurrent;
@@ -80,12 +82,12 @@ class EditsDoubleBuffer {
    * Writes the content of the "ready" buffer to the given output stream,
    * and resets it. Does not swap any buffers.
    */
-  void flushTo(OutputStream out) throws IOException {
+  public void flushTo(OutputStream out) throws IOException {
     bufReady.writeTo(out); // write data to file
     bufReady.reset(); // erase all data in the buffer
   }
   
-  boolean shouldForceSync() {
+  public boolean shouldForceSync() {
     return bufCurrent.size() >= initBufferSize;
   }
 
@@ -120,6 +122,12 @@ class EditsDoubleBuffer {
     return bufReady.numTxns;
   }
 
+  /**
+   * @return the number of bytes that are ready to be flushed
+   */
+  public int countReadyBytes() {
+    return bufReady.size();
+  }
   
   private static class TxnBuffer extends DataOutputBuffer {
     long firstTxId;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Oct 19 02:25:55 2012
@@ -32,8 +32,16 @@ public interface FSClusterStats {
    * @return a count of the total number of block transfers and block
    *         writes that are currently occuring on the cluster.
    */
-
-  public int getTotalLoad() ;
+  public int getTotalLoad();
+  
+  /**
+   * Indicate whether or not the cluster is now avoiding 
+   * to use stale DataNodes for writing.
+   * 
+   * @return True if the cluster is currently avoiding using stale DataNodes 
+   *         for writing targets, and false otherwise.
+   */
+  public boolean isAvoidingStaleDataNodesForWrite();
 }
     
     

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
 
 import java.io.Closeable;
 import java.io.FileNotFoundException;
@@ -187,6 +187,7 @@ public class FSDirectory implements Clos
   /**
    * Shutdown the filestore
    */
+  @Override
   public void close() throws IOException {
     fsImage.close();
   }
@@ -212,8 +213,9 @@ public class FSDirectory implements Clos
 
   /**
    * Add the given filename to the fs.
-   * @throws QuotaExceededException 
-   * @throws FileAlreadyExistsException 
+   * @throws FileAlreadyExistsException
+   * @throws QuotaExceededException
+   * @throws UnresolvedLinkException
    */
   INodeFileUnderConstruction addFile(String path, 
                 PermissionStatus permissions,
@@ -229,8 +231,15 @@ public class FSDirectory implements Clos
 
     // Always do an implicit mkdirs for parent directory tree.
     long modTime = now();
-    if (!mkdirs(new Path(path).getParent().toString(), permissions, true,
-        modTime)) {
+    
+    Path parent = new Path(path).getParent();
+    if (parent == null) {
+      // Trying to add "/" as a file - this path has no
+      // parent -- avoids an NPE below.
+      return null;
+    }
+    
+    if (!mkdirs(parent.toString(), permissions, true, modTime)) {
       return null;
     }
     INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
@@ -257,8 +266,6 @@ public class FSDirectory implements Clos
     return newNode;
   }
 
-  /**
-   */
   INode unprotectedAddFile( String path, 
                             PermissionStatus permissions,
                             short replication,
@@ -267,8 +274,7 @@ public class FSDirectory implements Clos
                             long preferredBlockSize,
                             boolean underConstruction,
                             String clientName,
-                            String clientMachine)
-      throws UnresolvedLinkException {
+                            String clientMachine) {
     INode newNode;
     assert hasWriteLock();
     if (underConstruction) {
@@ -284,13 +290,18 @@ public class FSDirectory implements Clos
     try {
       newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
     } catch (IOException e) {
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug(
+            "DIR* FSDirectory.unprotectedAddFile: exception when add " + path
+                + " to the file system", e);
+      }
       return null;
     }
     return newNode;
   }
 
   INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
-      INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
+      INode newNode, boolean propagateModTime) {
     // NOTE: This does not update space counts for parents
     INodeDirectory newParent = null;
     writeLock();
@@ -337,13 +348,13 @@ public class FSDirectory implements Clos
 
       // check quota limits and updated space consumed
       updateCount(inodes, inodes.length-1, 0,
-          fileINode.getPreferredBlockSize()*fileINode.getReplication(), true);
+          fileINode.getPreferredBlockSize()*fileINode.getBlockReplication(), true);
 
       // associate new last block for the file
       BlockInfoUnderConstruction blockInfo =
         new BlockInfoUnderConstruction(
             block,
-            fileINode.getReplication(),
+            fileINode.getBlockReplication(),
             BlockUCState.UNDER_CONSTRUCTION,
             targets);
       getBlockManager().addBlockCollection(blockInfo, fileINode);
@@ -434,7 +445,7 @@ public class FSDirectory implements Clos
     // update space consumed
     INode[] pathINodes = getExistingPathINodes(path);
     updateCount(pathINodes, pathINodes.length-1, 0,
-        -fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
+        -fileNode.getPreferredBlockSize()*fileNode.getBlockReplication(), true);
   }
 
   /**
@@ -813,7 +824,7 @@ public class FSDirectory implements Clos
       return null;
     }
     INodeFile fileNode = (INodeFile)inode;
-    final short oldRepl = fileNode.getReplication();
+    final short oldRepl = fileNode.getBlockReplication();
 
     // check disk quota
     long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
@@ -2053,7 +2064,7 @@ public class FSDirectory implements Clos
      if (node instanceof INodeFile) {
        INodeFile fileNode = (INodeFile)node;
        size = fileNode.computeFileSize(true);
-       replication = fileNode.getReplication();
+       replication = fileNode.getBlockReplication();
        blocksize = fileNode.getPreferredBlockSize();
      }
      return new HdfsFileStatus(
@@ -2083,7 +2094,7 @@ public class FSDirectory implements Clos
       if (node instanceof INodeFile) {
         INodeFile fileNode = (INodeFile)node;
         size = fileNode.computeFileSize(true);
-        replication = fileNode.getReplication();
+        replication = fileNode.getBlockReplication();
         blocksize = fileNode.getPreferredBlockSize();
         loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
             fileNode.getBlocks(), fileNode.computeFileSize(false),



Mime
View raw message